distributed_lock_redis/redlock/
release.rs

1//! RedLock release algorithm implementation.
2
3use distributed_lock_core::error::{LockError, LockResult};
4use fred::prelude::*;
5
6use super::helper::RedLockHelper;
7
8/// Releases a lock using the RedLock algorithm across multiple Redis servers.
9///
10/// Attempts to release on all clients where the lock was acquired. Requires
11/// majority consensus for successful release (though release failures are
12/// generally non-fatal).
13///
14/// # Arguments
15///
16/// * `try_release_fn` - Function that attempts to release the lock on a single client
17/// * `clients` - List of Redis clients
18/// * `acquire_results` - Previous acquire results indicating which clients hold the lock
19pub async fn release_redlock<F, Fut>(
20    try_release_fn: F,
21    clients: &[RedisClient],
22    acquire_results: &[bool],
23) -> LockResult<()>
24where
25    F: Fn(&RedisClient) -> Fut + Send + Sync + Clone + 'static,
26    Fut: std::future::Future<Output = LockResult<()>> + Send,
27{
28    // Only release on clients where we successfully acquired
29    let clients_to_release: Vec<RedisClient> = acquire_results
30        .iter()
31        .enumerate()
32        .filter(|&(_, &success)| success)
33        .map(|(idx, _)| clients[idx].clone())
34        .collect();
35
36    if clients_to_release.is_empty() {
37        return Ok(()); // Nothing to release
38    }
39
40    // Start release attempts on all clients in parallel
41    let mut release_tasks: Vec<tokio::task::JoinHandle<LockResult<()>>> = Vec::new();
42
43    for client in &clients_to_release {
44        let client_clone = client.clone();
45        let try_release_fn_clone = try_release_fn.clone();
46        let task = tokio::spawn(async move { try_release_fn_clone(&client_clone).await });
47        release_tasks.push(task);
48    }
49
50    // Wait for results
51    let mut success_count = 0;
52    let mut fault_count = 0;
53    let mut errors: Vec<LockError> = Vec::new();
54    let total_clients = clients_to_release.len();
55
56    // Wait for all tasks to complete
57    for task in release_tasks {
58        match task.await {
59            Ok(Ok(())) => {
60                success_count += 1;
61            }
62            Ok(Err(e)) => {
63                fault_count += 1;
64                errors.push(e);
65                // Continue - we want to try releasing on all clients
66            }
67            Err(_) => {
68                // Task panicked - treat as fault
69                fault_count += 1;
70            }
71        }
72    }
73
74    // Check if we achieved majority release
75    if RedLockHelper::has_sufficient_successes(success_count, total_clients) {
76        // Majority released successfully - that's good enough
77        Ok(())
78    } else if RedLockHelper::has_too_many_failures_or_faults(fault_count, total_clients) {
79        // Too many failures - return error
80        if errors.is_empty() {
81            Err(LockError::Backend(Box::new(std::io::Error::other(
82                "failed to release lock on majority of servers",
83            ))))
84        } else {
85            // Return first error
86            Err(errors.into_iter().next().unwrap())
87        }
88    } else {
89        // Some failures but not enough to be fatal - still OK
90        Ok(())
91    }
92}