use std::{sync::Arc, time::Duration};
pub mod error;
pub mod providers;
use bon::Builder;
use tokio::{
join, select,
sync::Notify,
time::{interval, sleep},
};
use crate::error::DLockError;
#[cfg(feature = "dynamodb")]
pub use providers::dynamodb::{DynamodbLease, DynamodbProvider, DynamodbRetry};
pub use providers::{Lease, Provider};
#[derive(Builder)]
pub struct DLock<P>
where
P: Provider,
{
name: String,
duration: Duration,
retry: Option<Duration>,
owner: String,
provider: P,
}
impl<A> DLock<A>
where
A: Provider,
{
pub async fn acquire(&self) -> Result<A::L, DLockError<A::R>> {
self.provider
.acquire(&self.name, &self.owner, &self.duration, None)
.await
}
pub async fn retry(&self, retry: A::R) -> Result<A::L, DLockError<A::R>> {
self.provider
.acquire(&self.name, &self.owner, &self.duration, Some(retry))
.await
}
pub async fn with<R, F>(&self, mut f: F) -> Result<R, DLockError>
where
F: AsyncFnMut(A::T) -> R,
R: Send,
{
let mut retry = None;
let lease = loop {
match self.acquire_or_retry(retry.clone()).await {
Ok(lease) => break lease,
Err(error) => match error {
DLockError::AlreadyAcquired(r) => {
retry = Some(r);
}
DLockError::ProviderError(error) => {
return Err(DLockError::ProviderError(error));
}
DLockError::AlreadyReleased => {
retry = None;
}
},
}
sleep(self.retry.unwrap_or_default()).await
};
let token = lease.token();
let notify = Arc::new(Notify::new());
let wrapper = async || {
let result = f(token).await;
notify.notify_one();
result
};
let (result, lease) = join! {
wrapper(),
self.renew(lease, Arc::clone(¬ify)),
};
lease?.release().await?;
Ok(result)
}
async fn acquire_or_retry(&self, retry: Option<A::R>) -> Result<A::L, DLockError<A::R>> {
match retry {
Some(retry) => self.retry(retry).await,
None => self.acquire().await,
}
}
async fn renew(&self, mut lease: A::L, notify: Arc<Notify>) -> Result<A::L, DLockError> {
let mut interval = interval(self.duration / 3);
Ok(loop {
let stop = select! {
_ = interval.tick() => false,
_ = notify.notified() => true,
};
if !stop {
lease = lease.renew().await?;
} else {
break lease;
}
})
}
}