distributed_lock_redis/redlock/
extend.rs

1//! RedLock extend algorithm implementation.
2
3use std::time::Duration;
4
5use distributed_lock_core::error::{LockError, LockResult};
6use fred::prelude::*;
7use tokio::time::Instant;
8
9use super::helper::RedLockHelper;
10use super::timeouts::RedLockTimeouts;
11
12/// Extends a lock using the RedLock algorithm across multiple Redis servers.
13///
14/// Similar to acquire, requires majority consensus for successful extension.
15///
16/// # Arguments
17///
18/// * `try_extend_fn` - Function that attempts to extend the lock on a single client
19/// * `clients` - List of Redis clients where lock was acquired
20/// * `acquire_results` - Previous acquire results indicating which clients hold the lock
21/// * `timeouts` - Timeout configuration
22/// * `cancel_token` - Cancellation token
23pub async fn extend_redlock<F, Fut>(
24    try_extend_fn: F,
25    clients: &[RedisClient],
26    acquire_results: &[bool],
27    timeouts: &RedLockTimeouts,
28    cancel_token: &tokio::sync::watch::Receiver<bool>,
29) -> LockResult<Option<bool>>
30where
31    F: Fn(&RedisClient) -> Fut + Send + Sync + Clone + 'static,
32    Fut: std::future::Future<Output = LockResult<bool>> + Send,
33{
34    // Only extend on clients where we successfully acquired
35    let clients_to_extend: Vec<(usize, RedisClient)> = acquire_results
36        .iter()
37        .enumerate()
38        .filter(|&(_, &success)| success)
39        .map(|(idx, _)| (idx, clients[idx].clone()))
40        .collect();
41
42    if clients_to_extend.is_empty() {
43        return Ok(Some(false)); // No locks to extend
44    }
45
46    let acquire_timeout = timeouts.acquire_timeout();
47    let timeout_duration = acquire_timeout.as_duration();
48
49    // Start extend attempts on all clients in parallel
50    let mut extend_tasks: Vec<tokio::task::JoinHandle<LockResult<bool>>> = Vec::new();
51
52    for (_, client) in &clients_to_extend {
53        let client_clone = client.clone();
54        let try_extend_fn_clone = try_extend_fn.clone();
55        let task = tokio::spawn(async move { try_extend_fn_clone(&client_clone).await });
56        extend_tasks.push(task);
57    }
58
59    // Wait for results with timeout
60    let start = Instant::now();
61    let mut success_count = 0;
62    let mut fail_count = 0;
63    let total_clients = clients_to_extend.len();
64
65    // Wait for tasks to complete or timeout
66    loop {
67        // Check timeout
68        if let Some(timeout_dur) = timeout_duration
69            && start.elapsed() >= timeout_dur
70        {
71            // Timeout - return inconclusive
72            return Ok(None);
73        }
74
75        // Check for cancellation
76        if cancel_token.has_changed().unwrap_or(false) && *cancel_token.borrow() {
77            return Err(LockError::Cancelled);
78        }
79
80        // Check completed tasks
81        for task in extend_tasks.iter_mut() {
82            if task.is_finished() {
83                match task.await {
84                    Ok(Ok(true)) => {
85                        success_count += 1;
86                        if RedLockHelper::has_sufficient_successes(success_count, total_clients) {
87                            // We have majority - return success
88                            return Ok(Some(true));
89                        }
90                    }
91                    Ok(Ok(false)) => {
92                        fail_count += 1;
93                        if RedLockHelper::has_too_many_failures_or_faults(fail_count, total_clients)
94                        {
95                            // Can't achieve majority - return failure
96                            return Ok(Some(false));
97                        }
98                    }
99                    Ok(Err(e)) => {
100                        // Error extending - treat as failure
101                        fail_count += 1;
102                        if RedLockHelper::has_too_many_failures_or_faults(fail_count, total_clients)
103                        {
104                            return Err(e);
105                        }
106                    }
107                    Err(_) => {
108                        // Task panicked - treat as failure
109                        fail_count += 1;
110                        if RedLockHelper::has_too_many_failures_or_faults(fail_count, total_clients)
111                        {
112                            return Ok(Some(false));
113                        }
114                    }
115                }
116            }
117        }
118
119        // If all tasks are done, check final result
120        if success_count + fail_count >= total_clients {
121            if RedLockHelper::has_sufficient_successes(success_count, total_clients) {
122                return Ok(Some(true));
123            } else {
124                return Ok(Some(false));
125            }
126        }
127
128        // Sleep briefly before checking again
129        tokio::time::sleep(Duration::from_millis(10)).await;
130    }
131}