use std::sync::Arc;
use std::time::Duration;
use distributed_lock_core::error::LockResult;
use distributed_lock_core::traits::LockHandle;
use fred::prelude::*;
use tokio::sync::watch;
use tracing::instrument;
use crate::lock::RedisLockState;
use crate::redlock::{extend::extend_redlock, release::release_redlock};
pub struct RedisLockHandle {
state: Arc<RedisLockState>,
acquire_results: Arc<Vec<bool>>,
clients: Arc<Vec<RedisClient>>,
#[allow(dead_code)]
extension_cadence: Duration,
#[allow(dead_code)]
expiry: Duration,
lost_receiver: watch::Receiver<bool>,
extension_task: tokio::task::JoinHandle<()>,
}
impl RedisLockHandle {
pub(crate) fn new(
state: RedisLockState,
acquire_results: Vec<bool>,
clients: Vec<RedisClient>,
extension_cadence: Duration,
expiry: Duration,
) -> Self {
let state = Arc::new(state);
let acquire_results = Arc::new(acquire_results);
let clients = Arc::new(clients);
let (lost_sender, lost_receiver) = watch::channel(false);
let state_clone = state.clone();
let acquire_results_clone = acquire_results.clone();
let clients_clone = clients.clone();
let extension_cadence_clone = extension_cadence;
let extension_task = tokio::spawn(async move {
let mut interval = tokio::time::interval(extension_cadence_clone);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
if lost_sender.is_closed() {
break;
}
let (_cancel_sender, cancel_receiver) = watch::channel(false);
let state_for_extend = state_clone.clone();
match extend_redlock(
move |client| {
let state = state_for_extend.clone();
let client = client.clone();
async move { state.try_extend(&client).await }
},
&clients_clone,
&acquire_results_clone,
&state_clone.timeouts,
&cancel_receiver,
)
.await
{
Ok(Some(true)) => {
continue;
}
Ok(Some(false)) => {
let _ = lost_sender.send(true);
break;
}
Ok(None) => {
continue;
}
Err(_) => {
let _ = lost_sender.send(true);
break;
}
}
}
});
Self {
state,
acquire_results,
clients,
extension_cadence,
expiry,
lost_receiver,
extension_task,
}
}
}
impl LockHandle for RedisLockHandle {
fn lost_token(&self) -> &watch::Receiver<bool> {
&self.lost_receiver
}
#[instrument(skip(self), fields(lock.key = %self.state.key, backend = "redis"))]
async fn release(self) -> LockResult<()> {
self.extension_task.abort();
let state = self.state.clone();
let clients = self.clients.clone();
let acquire_results = self.acquire_results.clone();
release_redlock(
move |client| {
let state = state.clone();
let client = client.clone();
async move { state.try_release(&client).await }
},
&clients,
&acquire_results,
)
.await
}
}
impl Drop for RedisLockHandle {
fn drop(&mut self) {
self.extension_task.abort();
}
}