#![cfg_attr(
not(any(feature = "pg", feature = "mysql", feature = "sqlite")),
allow(unused_imports, unused_variables, dead_code, unreachable_code)
)]
use std::path::PathBuf;
use std::time::{Duration, Instant};
use thiserror::Error;
use xxhash_rust::xxh3::xxh3_64;
use chrono::SecondsFormat;
use tokio::fs::File;
#[derive(Debug, Clone)]
pub struct LockConfig {
pub max_wait: Option<Duration>,
pub initial_backoff: Duration,
pub max_backoff: Duration,
pub backoff_multiplier: f64,
pub jitter_pct: f32,
pub max_attempts: Option<u32>,
}
impl Default for LockConfig {
fn default() -> Self {
Self {
max_wait: Some(Duration::from_secs(30)),
initial_backoff: Duration::from_millis(50),
max_backoff: Duration::from_secs(5),
backoff_multiplier: 1.5,
jitter_pct: 0.2,
max_attempts: None,
}
}
}
#[derive(Debug)]
enum GuardInner {
File { path: PathBuf, file: File },
}
#[derive(Debug)]
pub struct DbLockGuard {
namespaced_key: String,
inner: Option<GuardInner>, }
impl DbLockGuard {
pub fn key(&self) -> &str {
&self.namespaced_key
}
pub async fn release(mut self) {
if let Some(inner) = self.inner.take() {
unlock_inner(inner).await;
}
}
}
impl Drop for DbLockGuard {
fn drop(&mut self) {
if let Some(inner) = self.inner.take()
&& let Ok(handle) = tokio::runtime::Handle::try_current()
{
handle.spawn(async move { unlock_inner(inner).await });
}
}
}
async fn unlock_inner(inner: GuardInner) {
match inner {
GuardInner::File { path, file } => {
drop(file);
_ = tokio::fs::remove_file(&path).await;
}
}
}
pub(crate) struct LockManager {
dsn: String,
}
impl LockManager {
#[must_use]
pub fn new(dsn: String) -> Self {
Self { dsn }
}
pub async fn lock(&self, module: &str, key: &str) -> Result<DbLockGuard, DbLockError> {
let namespaced_key = format!("{module}:{key}");
self.lock_file(&namespaced_key).await
}
pub async fn try_lock(
&self,
module: &str,
key: &str,
config: LockConfig,
) -> Result<Option<DbLockGuard>, DbLockError> {
let namespaced_key = format!("{module}:{key}");
let start = Instant::now();
let mut attempt = 0u32;
let mut backoff = config.initial_backoff;
loop {
attempt += 1;
if let Some(max_attempts) = config.max_attempts
&& attempt > max_attempts
{
return Ok(None);
}
if let Some(max_wait) = config.max_wait
&& start.elapsed() >= max_wait
{
return Ok(None);
}
if let Some(guard) = self.try_acquire_once(&namespaced_key).await? {
return Ok(Some(guard));
}
let remaining = config
.max_wait
.map_or(backoff, |mw| mw.saturating_sub(start.elapsed()));
if remaining.is_zero() {
return Ok(None);
}
#[allow(clippy::cast_precision_loss)]
let jitter_factor = {
let pct = f64::from(config.jitter_pct.clamp(0.0, 1.0));
let lo = 1.0 - pct;
let hi = 1.0 + pct;
let h = xxh3_64(namespaced_key.as_bytes()) as f64;
let frac = h / u64::MAX as f64; lo + frac * (hi - lo)
};
let sleep_for = std::cmp::min(backoff, remaining);
tokio::time::sleep(sleep_for.mul_f64(jitter_factor)).await;
let next = backoff.mul_f64(config.backoff_multiplier);
backoff = std::cmp::min(next, config.max_backoff);
}
}
async fn lock_file(&self, namespaced_key: &str) -> Result<DbLockGuard, DbLockError> {
let path = self.get_lock_file_path(namespaced_key);
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let file_res = tokio::fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(&path)
.await;
let file = match file_res {
Ok(f) => f,
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
return Err(DbLockError::AlreadyHeld {
lock_name: namespaced_key.to_owned(),
});
}
Err(e) => return Err(e.into()),
};
{
use tokio::io::AsyncWriteExt;
let mut f = file.try_clone().await?;
_ = f
.write_all(
format!(
"PID: {}\nKey: {}\nTimestamp: {}\n",
std::process::id(),
namespaced_key,
chrono::Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true)
)
.as_bytes(),
)
.await;
}
Ok(DbLockGuard {
namespaced_key: namespaced_key.to_owned(),
inner: Some(GuardInner::File { path, file }),
})
}
async fn try_lock_file(
&self,
namespaced_key: &str,
) -> Result<Option<DbLockGuard>, DbLockError> {
let path = self.get_lock_file_path(namespaced_key);
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
match tokio::fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(&path)
.await
{
Ok(file) => {
{
use tokio::io::AsyncWriteExt;
let mut f = file.try_clone().await?;
_ = f
.write_all(
format!(
"PID: {}\nKey: {}\nTimestamp: {}\n",
std::process::id(),
namespaced_key,
chrono::Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true)
)
.as_bytes(),
)
.await;
}
Ok(Some(DbLockGuard {
namespaced_key: namespaced_key.to_owned(),
inner: Some(GuardInner::File { path, file }),
}))
}
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => Ok(None),
Err(e) => Err(e.into()),
}
}
async fn try_acquire_once(
&self,
namespaced_key: &str,
) -> Result<Option<DbLockGuard>, DbLockError> {
self.try_lock_file(namespaced_key).await
}
fn get_lock_file_path(&self, namespaced_key: &str) -> PathBuf {
let base_dir = if self.dsn.contains("memdb") || cfg!(test) {
std::env::temp_dir().join("hyperspot_test_locks")
} else {
let cache = dirs::cache_dir().unwrap_or_else(std::env::temp_dir);
cache.join("hyperspot").join("locks")
};
let dsn_hash = format!("{:x}", xxh3_64(self.dsn.as_bytes()));
let key_hash = format!("{:x}", xxh3_64(namespaced_key.as_bytes()));
base_dir.join(dsn_hash).join(format!("{key_hash}.lock"))
}
}
#[derive(Error, Debug)]
pub enum DbLockError {
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("Lock already held: {lock_name}")]
AlreadyHeld { lock_name: String },
#[error("Lock not found: {lock_name}")]
NotFound { lock_name: String },
}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use super::*;
use anyhow::Result;
use std::sync::Arc;
#[tokio::test]
async fn test_namespaced_locks() -> Result<()> {
let lock_manager = LockManager::new("test_dsn".to_owned());
let test_id = format!(
"test_ns_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
);
let guard1 = lock_manager
.lock("module1", &format!("{test_id}_key"))
.await?;
let guard2 = lock_manager
.lock("module2", &format!("{test_id}_key"))
.await?;
assert!(!guard1.key().is_empty());
assert!(!guard2.key().is_empty());
guard1.release().await;
guard2.release().await;
Ok(())
}
#[tokio::test]
async fn test_try_lock_with_timeout() -> Result<()> {
let lock_manager = Arc::new(LockManager::new("test_dsn".to_owned()));
let test_id = format!(
"test_timeout_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
);
let _guard1 = lock_manager
.lock("test_module", &format!("{test_id}_key"))
.await?;
let config = LockConfig {
max_wait: Some(Duration::from_millis(200)),
initial_backoff: Duration::from_millis(50),
max_attempts: Some(3),
..Default::default()
};
let result = lock_manager
.try_lock("test_module", &format!("{test_id}_different_key"), config)
.await?;
assert!(result.is_some(), "expected successful lock acquisition");
Ok(())
}
#[tokio::test]
async fn test_try_lock_success() -> Result<()> {
let lock_manager = LockManager::new("test_dsn".to_owned());
let test_id = format!(
"test_success_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
);
let result = lock_manager
.try_lock(
"test_module",
&format!("{test_id}_key"),
LockConfig::default(),
)
.await?;
assert!(result.is_some(), "expected lock acquisition");
Ok(())
}
#[tokio::test]
async fn test_double_lock_same_key_errors() -> Result<()> {
let lock_manager = LockManager::new("test_dsn".to_owned());
let test_id = format!(
"test_double_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
);
let guard = lock_manager.lock("test_module", &test_id).await?;
let err = lock_manager
.lock("test_module", &test_id)
.await
.unwrap_err();
match err {
DbLockError::AlreadyHeld { lock_name } => {
assert!(lock_name.contains(&test_id));
}
other => panic!("unexpected error: {other:?}"),
}
guard.release().await;
Ok(())
}
#[tokio::test]
async fn test_try_lock_conflict_returns_none() -> Result<()> {
let lock_manager = LockManager::new("test_dsn".to_owned());
let key = format!(
"test_conflict_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
);
let _guard = lock_manager.lock("module", &key).await?;
let config = LockConfig {
max_wait: Some(Duration::from_millis(100)),
max_attempts: Some(2),
..Default::default()
};
let res = lock_manager.try_lock("module", &key, config).await?;
assert!(res.is_none());
Ok(())
}
}