distributed_lock_redis/redlock/
release.rs1use distributed_lock_core::error::{LockError, LockResult};
4use fred::prelude::*;
5
6use super::helper::RedLockHelper;
7
8pub async fn release_redlock<F, Fut>(
20 try_release_fn: F,
21 clients: &[RedisClient],
22 acquire_results: &[bool],
23) -> LockResult<()>
24where
25 F: Fn(&RedisClient) -> Fut + Send + Sync + Clone + 'static,
26 Fut: std::future::Future<Output = LockResult<()>> + Send,
27{
28 let clients_to_release: Vec<RedisClient> = acquire_results
30 .iter()
31 .enumerate()
32 .filter(|&(_, &success)| success)
33 .map(|(idx, _)| clients[idx].clone())
34 .collect();
35
36 if clients_to_release.is_empty() {
37 return Ok(()); }
39
40 let mut release_tasks: Vec<tokio::task::JoinHandle<LockResult<()>>> = Vec::new();
42
43 for client in &clients_to_release {
44 let client_clone = client.clone();
45 let try_release_fn_clone = try_release_fn.clone();
46 let task = tokio::spawn(async move { try_release_fn_clone(&client_clone).await });
47 release_tasks.push(task);
48 }
49
50 let mut success_count = 0;
52 let mut fault_count = 0;
53 let mut errors: Vec<LockError> = Vec::new();
54 let total_clients = clients_to_release.len();
55
56 for task in release_tasks {
58 match task.await {
59 Ok(Ok(())) => {
60 success_count += 1;
61 }
62 Ok(Err(e)) => {
63 fault_count += 1;
64 errors.push(e);
65 }
67 Err(_) => {
68 fault_count += 1;
70 }
71 }
72 }
73
74 if RedLockHelper::has_sufficient_successes(success_count, total_clients) {
76 Ok(())
78 } else if RedLockHelper::has_too_many_failures_or_faults(fault_count, total_clients) {
79 if errors.is_empty() {
81 Err(LockError::Backend(Box::new(std::io::Error::other(
82 "failed to release lock on majority of servers",
83 ))))
84 } else {
85 Err(errors.into_iter().next().unwrap())
87 }
88 } else {
89 Ok(())
91 }
92}