distributed_lock_redis/redlock/
acquire.rs

1//! RedLock acquire algorithm implementation.
2
3use std::time::Duration;
4
5use distributed_lock_core::error::{LockError, LockResult};
6use fred::prelude::*;
7use tokio::time::Instant;
8
9use super::helper::RedLockHelper;
10use super::timeouts::RedLockTimeouts;
11
12/// Result of a RedLock acquire operation.
13#[derive(Debug)]
14pub struct RedLockAcquireResult {
15    /// Results indexed by client position (true = success, false = failed).
16    pub acquire_results: Vec<bool>,
17}
18
19impl RedLockAcquireResult {
20    /// Creates a new acquire result.
21    pub fn new(acquire_results: Vec<bool>) -> Self {
22        Self { acquire_results }
23    }
24
25    /// Checks if the acquire was successful (majority consensus).
26    pub fn is_successful(&self, total_clients: usize) -> bool {
27        let success_count = self.acquire_results.iter().filter(|&&v| v).count();
28        RedLockHelper::has_sufficient_successes(success_count, total_clients)
29    }
30
31    /// Returns the number of successful acquisitions.
32    pub fn success_count(&self) -> usize {
33        self.acquire_results.iter().filter(|&&v| v).count()
34    }
35}
36
37/// Acquires a lock using the RedLock algorithm across multiple Redis servers.
38///
39/// The algorithm requires majority consensus: for N servers, we need at least
40/// (N/2 + 1) successful acquisitions.
41///
42/// # Arguments
43///
44/// * `try_acquire_fn` - Function that attempts to acquire the lock on a single client
45/// * `clients` - List of Redis clients to acquire on
46/// * `timeouts` - Timeout configuration
47/// * `cancel_token` - Cancellation token
48pub async fn acquire_redlock<F, Fut>(
49    try_acquire_fn: F,
50    clients: &[RedisClient],
51    timeouts: &RedLockTimeouts,
52    cancel_token: &tokio::sync::watch::Receiver<bool>,
53) -> LockResult<Option<RedLockAcquireResult>>
54where
55    F: Fn(&RedisClient) -> Fut + Send + Sync + Clone + 'static,
56    Fut: std::future::Future<Output = LockResult<bool>> + Send,
57{
58    if clients.is_empty() {
59        return Err(LockError::InvalidName(
60            "no Redis clients provided".to_string(),
61        ));
62    }
63
64    // Single client case - simpler path
65    if clients.len() == 1 {
66        return acquire_single_client(try_acquire_fn, &clients[0], timeouts, cancel_token).await;
67    }
68
69    // Multi-client RedLock algorithm
70    let acquire_timeout = timeouts.acquire_timeout();
71    let timeout_duration = acquire_timeout.as_duration();
72
73    // Start acquire attempts on all clients in parallel
74    let mut acquire_tasks: Vec<tokio::task::JoinHandle<LockResult<bool>>> = Vec::new();
75
76    for client in clients {
77        let client_clone = client.clone();
78        let try_acquire_fn_clone = try_acquire_fn.clone();
79        let task = tokio::spawn(async move { try_acquire_fn_clone(&client_clone).await });
80        acquire_tasks.push(task);
81    }
82
83    // Wait for results with timeout
84    let start = Instant::now();
85    let mut results: Vec<Option<bool>> = vec![None; clients.len()];
86    let mut success_count = 0;
87    let mut fail_count = 0;
88
89    // Wait for tasks to complete or timeout
90    loop {
91        // Check timeout
92        if let Some(timeout_dur) = timeout_duration {
93            if start.elapsed() >= timeout_dur {
94                // Timeout - return failure
95                return Ok(None);
96            }
97        }
98
99        // Check for cancellation
100        if cancel_token.has_changed().unwrap_or(false) && *cancel_token.borrow() {
101            return Err(LockError::Cancelled);
102        }
103
104        // Check completed tasks
105        for (idx, task) in acquire_tasks.iter_mut().enumerate() {
106            if results[idx].is_some() {
107                continue; // Already processed
108            }
109
110            if task.is_finished() {
111                match task.await {
112                    Ok(Ok(true)) => {
113                        results[idx] = Some(true);
114                        success_count += 1;
115                        if RedLockHelper::has_sufficient_successes(success_count, clients.len()) {
116                            // We have majority - fill remaining with false and return success
117                            for r in results.iter_mut() {
118                                if r.is_none() {
119                                    *r = Some(false);
120                                }
121                            }
122                            return Ok(Some(RedLockAcquireResult::new(
123                                results.into_iter().map(|r| r.unwrap_or(false)).collect(),
124                            )));
125                        }
126                    }
127                    Ok(Ok(false)) => {
128                        results[idx] = Some(false);
129                        fail_count += 1;
130                        if RedLockHelper::has_too_many_failures_or_faults(fail_count, clients.len())
131                        {
132                            // Can't achieve majority - return failure
133                            return Ok(None);
134                        }
135                    }
136                    Ok(Err(e)) => {
137                        // Error acquiring - treat as failure
138                        results[idx] = Some(false);
139                        fail_count += 1;
140                        if RedLockHelper::has_too_many_failures_or_faults(fail_count, clients.len())
141                        {
142                            return Err(e);
143                        }
144                    }
145                    Err(_) => {
146                        // Task panicked - treat as failure
147                        results[idx] = Some(false);
148                        fail_count += 1;
149                        if RedLockHelper::has_too_many_failures_or_faults(fail_count, clients.len())
150                        {
151                            return Ok(None);
152                        }
153                    }
154                }
155            }
156        }
157
158        // If all tasks are done, check final result
159        if results.iter().all(|r| r.is_some()) {
160            let result = RedLockAcquireResult::new(
161                results.into_iter().map(|r| r.unwrap_or(false)).collect(),
162            );
163            if result.is_successful(clients.len()) {
164                return Ok(Some(result));
165            } else {
166                return Ok(None);
167            }
168        }
169
170        // Sleep briefly before checking again
171        tokio::time::sleep(Duration::from_millis(10)).await;
172    }
173}
174
175/// Acquires a lock on a single Redis client (simpler path).
176async fn acquire_single_client<F, Fut>(
177    try_acquire_fn: F,
178    client: &RedisClient,
179    timeouts: &RedLockTimeouts,
180    cancel_token: &tokio::sync::watch::Receiver<bool>,
181) -> LockResult<Option<RedLockAcquireResult>>
182where
183    F: Fn(&RedisClient) -> Fut + Send + Sync,
184    Fut: std::future::Future<Output = LockResult<bool>> + Send,
185{
186    let acquire_timeout = timeouts.acquire_timeout();
187    let timeout_duration = acquire_timeout.as_duration();
188
189    // Check for cancellation first
190    if cancel_token.has_changed().unwrap_or(false) && *cancel_token.borrow() {
191        return Err(LockError::Cancelled);
192    }
193
194    let acquire_future = try_acquire_fn(client);
195
196    let result = if let Some(timeout_dur) = timeout_duration {
197        match tokio::time::timeout(timeout_dur, acquire_future).await {
198            Ok(Ok(true)) => true,
199            Ok(Ok(false)) => return Ok(None),
200            Ok(Err(e)) => return Err(e),
201            Err(_) => return Ok(None), // Timeout
202        }
203    } else {
204        // No timeout - wait indefinitely (but check cancellation)
205        loop {
206            let mut cancel_rx = cancel_token.clone();
207            tokio::select! {
208                result = try_acquire_fn(client) => {
209                    match result {
210                        Ok(true) => break true,
211                        Ok(false) => return Ok(None),
212                        Err(e) => return Err(e),
213                    }
214                }
215                _ = cancel_rx.changed() => {
216                    if *cancel_rx.borrow() {
217                        return Err(LockError::Cancelled);
218                    }
219                    // Continue waiting
220                }
221            }
222        }
223    };
224
225    Ok(Some(RedLockAcquireResult::new(vec![result])))
226}