distributed_lock_redis/redlock/
extend.rs1use std::time::Duration;
4
5use distributed_lock_core::error::{LockError, LockResult};
6use fred::prelude::*;
7use tokio::time::Instant;
8
9use super::helper::RedLockHelper;
10use super::timeouts::RedLockTimeouts;
11
12pub async fn extend_redlock<F, Fut>(
24 try_extend_fn: F,
25 clients: &[RedisClient],
26 acquire_results: &[bool],
27 timeouts: &RedLockTimeouts,
28 cancel_token: &tokio::sync::watch::Receiver<bool>,
29) -> LockResult<Option<bool>>
30where
31 F: Fn(&RedisClient) -> Fut + Send + Sync + Clone + 'static,
32 Fut: std::future::Future<Output = LockResult<bool>> + Send,
33{
34 let clients_to_extend: Vec<(usize, RedisClient)> = acquire_results
36 .iter()
37 .enumerate()
38 .filter(|&(_, &success)| success)
39 .map(|(idx, _)| (idx, clients[idx].clone()))
40 .collect();
41
42 if clients_to_extend.is_empty() {
43 return Ok(Some(false)); }
45
46 let acquire_timeout = timeouts.acquire_timeout();
47 let timeout_duration = acquire_timeout.as_duration();
48
49 let mut extend_tasks: Vec<tokio::task::JoinHandle<LockResult<bool>>> = Vec::new();
51
52 for (_, client) in &clients_to_extend {
53 let client_clone = client.clone();
54 let try_extend_fn_clone = try_extend_fn.clone();
55 let task = tokio::spawn(async move { try_extend_fn_clone(&client_clone).await });
56 extend_tasks.push(task);
57 }
58
59 let start = Instant::now();
61 let mut success_count = 0;
62 let mut fail_count = 0;
63 let total_clients = clients_to_extend.len();
64
65 loop {
67 if let Some(timeout_dur) = timeout_duration
69 && start.elapsed() >= timeout_dur
70 {
71 return Ok(None);
73 }
74
75 if cancel_token.has_changed().unwrap_or(false) && *cancel_token.borrow() {
77 return Err(LockError::Cancelled);
78 }
79
80 for task in extend_tasks.iter_mut() {
82 if task.is_finished() {
83 match task.await {
84 Ok(Ok(true)) => {
85 success_count += 1;
86 if RedLockHelper::has_sufficient_successes(success_count, total_clients) {
87 return Ok(Some(true));
89 }
90 }
91 Ok(Ok(false)) => {
92 fail_count += 1;
93 if RedLockHelper::has_too_many_failures_or_faults(fail_count, total_clients)
94 {
95 return Ok(Some(false));
97 }
98 }
99 Ok(Err(e)) => {
100 fail_count += 1;
102 if RedLockHelper::has_too_many_failures_or_faults(fail_count, total_clients)
103 {
104 return Err(e);
105 }
106 }
107 Err(_) => {
108 fail_count += 1;
110 if RedLockHelper::has_too_many_failures_or_faults(fail_count, total_clients)
111 {
112 return Ok(Some(false));
113 }
114 }
115 }
116 }
117 }
118
119 if success_count + fail_count >= total_clients {
121 if RedLockHelper::has_sufficient_successes(success_count, total_clients) {
122 return Ok(Some(true));
123 } else {
124 return Ok(Some(false));
125 }
126 }
127
128 tokio::time::sleep(Duration::from_millis(10)).await;
130 }
131}