distributed_lock_redis/redlock/
acquire.rs1use std::time::Duration;
4
5use distributed_lock_core::error::{LockError, LockResult};
6use fred::prelude::*;
7use tokio::time::Instant;
8
9use super::helper::RedLockHelper;
10use super::timeouts::RedLockTimeouts;
11
12#[derive(Debug)]
14pub struct RedLockAcquireResult {
15 pub acquire_results: Vec<bool>,
17}
18
19impl RedLockAcquireResult {
20 pub fn new(acquire_results: Vec<bool>) -> Self {
22 Self { acquire_results }
23 }
24
25 pub fn is_successful(&self, total_clients: usize) -> bool {
27 let success_count = self.acquire_results.iter().filter(|&&v| v).count();
28 RedLockHelper::has_sufficient_successes(success_count, total_clients)
29 }
30
31 pub fn success_count(&self) -> usize {
33 self.acquire_results.iter().filter(|&&v| v).count()
34 }
35}
36
37pub async fn acquire_redlock<F, Fut>(
49 try_acquire_fn: F,
50 clients: &[RedisClient],
51 timeouts: &RedLockTimeouts,
52 cancel_token: &tokio::sync::watch::Receiver<bool>,
53) -> LockResult<Option<RedLockAcquireResult>>
54where
55 F: Fn(&RedisClient) -> Fut + Send + Sync + Clone + 'static,
56 Fut: std::future::Future<Output = LockResult<bool>> + Send,
57{
58 if clients.is_empty() {
59 return Err(LockError::InvalidName(
60 "no Redis clients provided".to_string(),
61 ));
62 }
63
64 if clients.len() == 1 {
66 return acquire_single_client(try_acquire_fn, &clients[0], timeouts, cancel_token).await;
67 }
68
69 let acquire_timeout = timeouts.acquire_timeout();
71 let timeout_duration = acquire_timeout.as_duration();
72
73 let mut acquire_tasks: Vec<tokio::task::JoinHandle<LockResult<bool>>> = Vec::new();
75
76 for client in clients {
77 let client_clone = client.clone();
78 let try_acquire_fn_clone = try_acquire_fn.clone();
79 let task = tokio::spawn(async move { try_acquire_fn_clone(&client_clone).await });
80 acquire_tasks.push(task);
81 }
82
83 let start = Instant::now();
85 let mut results: Vec<Option<bool>> = vec![None; clients.len()];
86 let mut success_count = 0;
87 let mut fail_count = 0;
88
89 loop {
91 if let Some(timeout_dur) = timeout_duration {
93 if start.elapsed() >= timeout_dur {
94 return Ok(None);
96 }
97 }
98
99 if cancel_token.has_changed().unwrap_or(false) && *cancel_token.borrow() {
101 return Err(LockError::Cancelled);
102 }
103
104 for (idx, task) in acquire_tasks.iter_mut().enumerate() {
106 if results[idx].is_some() {
107 continue; }
109
110 if task.is_finished() {
111 match task.await {
112 Ok(Ok(true)) => {
113 results[idx] = Some(true);
114 success_count += 1;
115 if RedLockHelper::has_sufficient_successes(success_count, clients.len()) {
116 for r in results.iter_mut() {
118 if r.is_none() {
119 *r = Some(false);
120 }
121 }
122 return Ok(Some(RedLockAcquireResult::new(
123 results.into_iter().map(|r| r.unwrap_or(false)).collect(),
124 )));
125 }
126 }
127 Ok(Ok(false)) => {
128 results[idx] = Some(false);
129 fail_count += 1;
130 if RedLockHelper::has_too_many_failures_or_faults(fail_count, clients.len())
131 {
132 return Ok(None);
134 }
135 }
136 Ok(Err(e)) => {
137 results[idx] = Some(false);
139 fail_count += 1;
140 if RedLockHelper::has_too_many_failures_or_faults(fail_count, clients.len())
141 {
142 return Err(e);
143 }
144 }
145 Err(_) => {
146 results[idx] = Some(false);
148 fail_count += 1;
149 if RedLockHelper::has_too_many_failures_or_faults(fail_count, clients.len())
150 {
151 return Ok(None);
152 }
153 }
154 }
155 }
156 }
157
158 if results.iter().all(|r| r.is_some()) {
160 let result = RedLockAcquireResult::new(
161 results.into_iter().map(|r| r.unwrap_or(false)).collect(),
162 );
163 if result.is_successful(clients.len()) {
164 return Ok(Some(result));
165 } else {
166 return Ok(None);
167 }
168 }
169
170 tokio::time::sleep(Duration::from_millis(10)).await;
172 }
173}
174
175async fn acquire_single_client<F, Fut>(
177 try_acquire_fn: F,
178 client: &RedisClient,
179 timeouts: &RedLockTimeouts,
180 cancel_token: &tokio::sync::watch::Receiver<bool>,
181) -> LockResult<Option<RedLockAcquireResult>>
182where
183 F: Fn(&RedisClient) -> Fut + Send + Sync,
184 Fut: std::future::Future<Output = LockResult<bool>> + Send,
185{
186 let acquire_timeout = timeouts.acquire_timeout();
187 let timeout_duration = acquire_timeout.as_duration();
188
189 if cancel_token.has_changed().unwrap_or(false) && *cancel_token.borrow() {
191 return Err(LockError::Cancelled);
192 }
193
194 let acquire_future = try_acquire_fn(client);
195
196 let result = if let Some(timeout_dur) = timeout_duration {
197 match tokio::time::timeout(timeout_dur, acquire_future).await {
198 Ok(Ok(true)) => true,
199 Ok(Ok(false)) => return Ok(None),
200 Ok(Err(e)) => return Err(e),
201 Err(_) => return Ok(None), }
203 } else {
204 loop {
206 let mut cancel_rx = cancel_token.clone();
207 tokio::select! {
208 result = try_acquire_fn(client) => {
209 match result {
210 Ok(true) => break true,
211 Ok(false) => return Ok(None),
212 Err(e) => return Err(e),
213 }
214 }
215 _ = cancel_rx.changed() => {
216 if *cancel_rx.borrow() {
217 return Err(LockError::Cancelled);
218 }
219 }
221 }
222 }
223 };
224
225 Ok(Some(RedLockAcquireResult::new(vec![result])))
226}