use std::time::Duration;
use distributed_lock_core::error::{LockError, LockResult};
use fred::prelude::*;
use tokio::time::Instant;
use super::helper::RedLockHelper;
use super::timeouts::RedLockTimeouts;
pub async fn extend_redlock<F, Fut>(
try_extend_fn: F,
clients: &[RedisClient],
acquire_results: &[bool],
timeouts: &RedLockTimeouts,
cancel_token: &tokio::sync::watch::Receiver<bool>,
) -> LockResult<Option<bool>>
where
F: Fn(&RedisClient) -> Fut + Send + Sync + Clone + 'static,
Fut: std::future::Future<Output = LockResult<bool>> + Send,
{
let clients_to_extend: Vec<(usize, RedisClient)> = acquire_results
.iter()
.enumerate()
.filter(|&(_, &success)| success)
.map(|(idx, _)| (idx, clients[idx].clone()))
.collect();
if clients_to_extend.is_empty() {
return Ok(Some(false)); }
let acquire_timeout = timeouts.acquire_timeout();
let timeout_duration = acquire_timeout.as_duration();
let mut extend_tasks: Vec<tokio::task::JoinHandle<LockResult<bool>>> = Vec::new();
for (_, client) in &clients_to_extend {
let client_clone = client.clone();
let try_extend_fn_clone = try_extend_fn.clone();
let task = tokio::spawn(async move { try_extend_fn_clone(&client_clone).await });
extend_tasks.push(task);
}
let start = Instant::now();
let mut success_count = 0;
let mut fail_count = 0;
let total_clients = clients_to_extend.len();
loop {
if let Some(timeout_dur) = timeout_duration
&& start.elapsed() >= timeout_dur
{
return Ok(None);
}
if cancel_token.has_changed().unwrap_or(false) && *cancel_token.borrow() {
return Err(LockError::Cancelled);
}
for task in extend_tasks.iter_mut() {
if task.is_finished() {
match task.await {
Ok(Ok(true)) => {
success_count += 1;
if RedLockHelper::has_sufficient_successes(success_count, total_clients) {
return Ok(Some(true));
}
}
Ok(Ok(false)) => {
fail_count += 1;
if RedLockHelper::has_too_many_failures_or_faults(fail_count, total_clients)
{
return Ok(Some(false));
}
}
Ok(Err(e)) => {
fail_count += 1;
if RedLockHelper::has_too_many_failures_or_faults(fail_count, total_clients)
{
return Err(e);
}
}
Err(_) => {
fail_count += 1;
if RedLockHelper::has_too_many_failures_or_faults(fail_count, total_clients)
{
return Ok(Some(false));
}
}
}
}
}
if success_count + fail_count >= total_clients {
if RedLockHelper::has_sufficient_successes(success_count, total_clients) {
return Ok(Some(true));
} else {
return Ok(Some(false));
}
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
}