use std::sync::Arc;
use std::time::Duration;
use futures::future::BoxFuture;
pub trait DLockBackend: Send + Sync + 'static {
fn try_acquire<'a>(
&'a self,
key: &'a str,
token: &'a str,
ttl_ms: u64,
) -> BoxFuture<'a, Result<Option<u64>, String>>;
fn release<'a>(&'a self, key: &'a str, token: &'a str) -> BoxFuture<'a, Result<bool, String>>;
fn renew<'a>(
&'a self,
key: &'a str,
token: &'a str,
ttl_ms: u64,
) -> BoxFuture<'a, Result<bool, String>>;
}
pub struct LockGuard {
pub fencing_token: u64,
key: String,
token: String,
backend: Arc<dyn DLockBackend>,
renew: Option<tokio::task::JoinHandle<()>>,
}
impl Drop for LockGuard {
fn drop(&mut self) {
if let Some(h) = self.renew.take() {
h.abort();
}
let (backend, key, token) = (self.backend.clone(), self.key.clone(), self.token.clone());
tokio::spawn(async move {
if let Err(e) = backend.release(&key, &token).await {
tracing::warn!(key = %key, error = %e, "lock release failed (TTL will reap)");
}
});
}
}
pub struct DistributedLock {
pub name: &'static str,
pub ttl_ms: u64,
}
impl DistributedLock {
pub const fn new(name: &'static str, ttl_ms: u64) -> Self {
Self { name, ttl_ms }
}
pub async fn try_lock(&self, backend: &Arc<dyn DLockBackend>) -> Option<LockGuard> {
let key = format!("arcly:lock:{}", self.name);
let token = random_token();
match backend.try_acquire(&key, &token, self.ttl_ms).await {
Ok(Some(fencing_token)) => {
let renew =
spawn_heartbeat(backend.clone(), key.clone(), token.clone(), self.ttl_ms);
Some(LockGuard {
fencing_token,
key,
token,
backend: backend.clone(),
renew: Some(renew),
})
}
Ok(None) => None,
Err(e) => {
metrics::counter!("dlock_backend_errors_total").increment(1);
tracing::warn!(lock = self.name, error = %e,
"lock backend unreachable — treating as not acquired");
None
}
}
}
}
fn spawn_heartbeat(
backend: Arc<dyn DLockBackend>,
key: String,
token: String,
ttl_ms: u64,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut tick = tokio::time::interval(Duration::from_millis((ttl_ms / 3).max(1)));
tick.tick().await; loop {
tick.tick().await;
match backend.renew(&key, &token, ttl_ms).await {
Ok(true) => {}
Ok(false) => {
tracing::warn!(key = %key, "lock lost — heartbeat stopping");
return;
}
Err(e) => tracing::warn!(key = %key, error = %e, "lock renew failed"),
}
}
})
}
fn random_token() -> String {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::sync::atomic::{AtomicU64, Ordering};
static SEQ: AtomicU64 = AtomicU64::new(0);
let mut h = DefaultHasher::new();
std::time::SystemTime::now().hash(&mut h);
std::thread::current().id().hash(&mut h);
SEQ.fetch_add(1, Ordering::Relaxed).hash(&mut h);
format!("{:016x}{:016x}", h.finish(), std::process::id() as u64)
}