1#![cfg_attr(
18 not(any(feature = "pg", feature = "mysql", feature = "sqlite")),
19 allow(unused_imports, unused_variables, dead_code, unreachable_code)
20)]
21
22use std::path::PathBuf;
23use std::time::{Duration, Instant};
24use thiserror::Error;
25use xxhash_rust::xxh3::xxh3_64;
26
27use chrono::SecondsFormat;
28
29use tokio::fs::File;
30
31#[derive(Debug, Clone)]
35pub struct LockConfig {
36 pub max_wait: Option<Duration>,
38 pub initial_backoff: Duration,
40 pub max_backoff: Duration,
42 pub backoff_multiplier: f64,
44 pub jitter_pct: f32,
46 pub max_attempts: Option<u32>,
48}
49
50impl Default for LockConfig {
51 fn default() -> Self {
52 Self {
53 max_wait: Some(Duration::from_secs(30)),
54 initial_backoff: Duration::from_millis(50),
55 max_backoff: Duration::from_secs(5),
56 backoff_multiplier: 1.5,
57 jitter_pct: 0.2,
58 max_attempts: None,
59 }
60 }
61}
62
63#[derive(Debug)]
66enum GuardInner {
67 File { path: PathBuf, file: File },
69}
70
71#[derive(Debug)]
74pub struct DbLockGuard {
75 namespaced_key: String,
76 inner: Option<GuardInner>, }
78
79impl DbLockGuard {
80 pub fn key(&self) -> &str {
82 &self.namespaced_key
83 }
84
85 pub async fn release(mut self) {
87 if let Some(inner) = self.inner.take() {
88 unlock_inner(inner).await;
89 }
90 }
92}
93
94impl Drop for DbLockGuard {
95 fn drop(&mut self) {
96 if let Some(inner) = self.inner.take()
98 && let Ok(handle) = tokio::runtime::Handle::try_current()
99 {
100 handle.spawn(async move { unlock_inner(inner).await });
101 }
102 }
106}
107
108async fn unlock_inner(inner: GuardInner) {
109 match inner {
110 GuardInner::File { path, file } => {
111 drop(file);
113 _ = tokio::fs::remove_file(&path).await;
114 }
115 }
116}
117
118pub(crate) struct LockManager {
122 dsn: String,
123}
124
125impl LockManager {
126 #[must_use]
127 pub fn new(dsn: String) -> Self {
128 Self { dsn }
129 }
130
131 pub async fn lock(&self, module: &str, key: &str) -> Result<DbLockGuard, DbLockError> {
139 let namespaced_key = format!("{module}:{key}");
140 self.lock_file(&namespaced_key).await
141 }
142
143 pub async fn try_lock(
153 &self,
154 module: &str,
155 key: &str,
156 config: LockConfig,
157 ) -> Result<Option<DbLockGuard>, DbLockError> {
158 let namespaced_key = format!("{module}:{key}");
159 let start = Instant::now();
160 let mut attempt = 0u32;
161 let mut backoff = config.initial_backoff;
162
163 loop {
164 attempt += 1;
165
166 if let Some(max_attempts) = config.max_attempts
167 && attempt > max_attempts
168 {
169 return Ok(None);
170 }
171 if let Some(max_wait) = config.max_wait
172 && start.elapsed() >= max_wait
173 {
174 return Ok(None);
175 }
176
177 if let Some(guard) = self.try_acquire_once(&namespaced_key).await? {
178 return Ok(Some(guard));
179 }
180
181 let remaining = config
183 .max_wait
184 .map_or(backoff, |mw| mw.saturating_sub(start.elapsed()));
185
186 if remaining.is_zero() {
187 return Ok(None);
188 }
189
190 #[allow(clippy::cast_precision_loss)]
191 let jitter_factor = {
192 let pct = f64::from(config.jitter_pct.clamp(0.0, 1.0));
193 let lo = 1.0 - pct;
194 let hi = 1.0 + pct;
195 let h = xxh3_64(namespaced_key.as_bytes()) as f64;
197 let frac = h / u64::MAX as f64; lo + frac * (hi - lo)
199 };
200
201 let sleep_for = std::cmp::min(backoff, remaining);
202 tokio::time::sleep(sleep_for.mul_f64(jitter_factor)).await;
203
204 let next = backoff.mul_f64(config.backoff_multiplier);
206 backoff = std::cmp::min(next, config.max_backoff);
207 }
208 }
209
210 async fn lock_file(&self, namespaced_key: &str) -> Result<DbLockGuard, DbLockError> {
213 let path = self.get_lock_file_path(namespaced_key);
214 if let Some(parent) = path.parent() {
215 tokio::fs::create_dir_all(parent).await?;
216 }
217
218 let file_res = tokio::fs::OpenOptions::new()
220 .write(true)
221 .create_new(true)
222 .open(&path)
223 .await;
224 let file = match file_res {
225 Ok(f) => f,
226 Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
227 return Err(DbLockError::AlreadyHeld {
228 lock_name: namespaced_key.to_owned(),
229 });
230 }
231 Err(e) => return Err(e.into()),
232 };
233
234 {
236 use tokio::io::AsyncWriteExt;
237 let mut f = file.try_clone().await?;
238 _ = f
239 .write_all(
240 format!(
241 "PID: {}\nKey: {}\nTimestamp: {}\n",
242 std::process::id(),
243 namespaced_key,
244 chrono::Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true)
245 )
246 .as_bytes(),
247 )
248 .await;
249 }
250
251 Ok(DbLockGuard {
252 namespaced_key: namespaced_key.to_owned(),
253 inner: Some(GuardInner::File { path, file }),
254 })
255 }
256
257 async fn try_lock_file(
258 &self,
259 namespaced_key: &str,
260 ) -> Result<Option<DbLockGuard>, DbLockError> {
261 let path = self.get_lock_file_path(namespaced_key);
262 if let Some(parent) = path.parent() {
263 tokio::fs::create_dir_all(parent).await?;
264 }
265
266 match tokio::fs::OpenOptions::new()
267 .write(true)
268 .create_new(true)
269 .open(&path)
270 .await
271 {
272 Ok(file) => {
273 {
275 use tokio::io::AsyncWriteExt;
276 let mut f = file.try_clone().await?;
277 _ = f
278 .write_all(
279 format!(
280 "PID: {}\nKey: {}\nTimestamp: {}\n",
281 std::process::id(),
282 namespaced_key,
283 chrono::Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true)
284 )
285 .as_bytes(),
286 )
287 .await;
288 }
289
290 Ok(Some(DbLockGuard {
291 namespaced_key: namespaced_key.to_owned(),
292 inner: Some(GuardInner::File { path, file }),
293 }))
294 }
295 Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => Ok(None),
296 Err(e) => Err(e.into()),
297 }
298 }
299
300 async fn try_acquire_once(
301 &self,
302 namespaced_key: &str,
303 ) -> Result<Option<DbLockGuard>, DbLockError> {
304 self.try_lock_file(namespaced_key).await
305 }
306
307 fn get_lock_file_path(&self, namespaced_key: &str) -> PathBuf {
309 let base_dir = if self.dsn.contains("memdb") || cfg!(test) {
311 std::env::temp_dir().join("hyperspot_test_locks")
312 } else {
313 let cache = dirs::cache_dir().unwrap_or_else(std::env::temp_dir);
315 cache.join("hyperspot").join("locks")
316 };
317
318 let dsn_hash = format!("{:x}", xxh3_64(self.dsn.as_bytes()));
319 let key_hash = format!("{:x}", xxh3_64(namespaced_key.as_bytes()));
320 base_dir.join(dsn_hash).join(format!("{key_hash}.lock"))
321 }
322}
323
324#[derive(Error, Debug)]
327pub enum DbLockError {
328 #[error("I/O error: {0}")]
329 Io(#[from] std::io::Error),
330
331 #[error("Lock already held: {lock_name}")]
332 AlreadyHeld { lock_name: String },
333
334 #[error("Lock not found: {lock_name}")]
335 NotFound { lock_name: String },
336}
337
338#[cfg(test)]
341#[cfg_attr(coverage_nightly, coverage(off))]
342mod tests {
343 use super::*;
344 use anyhow::Result;
345 use std::sync::Arc;
346
347 #[tokio::test]
348 async fn test_namespaced_locks() -> Result<()> {
349 let lock_manager = LockManager::new("test_dsn".to_owned());
350
351 let test_id = format!(
353 "test_ns_{}",
354 std::time::SystemTime::now()
355 .duration_since(std::time::UNIX_EPOCH)
356 .unwrap()
357 .as_nanos()
358 );
359
360 let guard1 = lock_manager
361 .lock("module1", &format!("{test_id}_key"))
362 .await?;
363 let guard2 = lock_manager
364 .lock("module2", &format!("{test_id}_key"))
365 .await?;
366
367 assert!(!guard1.key().is_empty());
368 assert!(!guard2.key().is_empty());
369
370 guard1.release().await;
371 guard2.release().await;
372 Ok(())
373 }
374
375 #[tokio::test]
376 async fn test_try_lock_with_timeout() -> Result<()> {
377 let lock_manager = Arc::new(LockManager::new("test_dsn".to_owned()));
378
379 let test_id = format!(
380 "test_timeout_{}",
381 std::time::SystemTime::now()
382 .duration_since(std::time::UNIX_EPOCH)
383 .unwrap()
384 .as_nanos()
385 );
386
387 let _guard1 = lock_manager
388 .lock("test_module", &format!("{test_id}_key"))
389 .await?;
390
391 let config = LockConfig {
393 max_wait: Some(Duration::from_millis(200)),
394 initial_backoff: Duration::from_millis(50),
395 max_attempts: Some(3),
396 ..Default::default()
397 };
398
399 let result = lock_manager
400 .try_lock("test_module", &format!("{test_id}_different_key"), config)
401 .await?;
402 assert!(result.is_some(), "expected successful lock acquisition");
403 Ok(())
404 }
405
406 #[tokio::test]
407 async fn test_try_lock_success() -> Result<()> {
408 let lock_manager = LockManager::new("test_dsn".to_owned());
409
410 let test_id = format!(
411 "test_success_{}",
412 std::time::SystemTime::now()
413 .duration_since(std::time::UNIX_EPOCH)
414 .unwrap()
415 .as_nanos()
416 );
417
418 let result = lock_manager
419 .try_lock(
420 "test_module",
421 &format!("{test_id}_key"),
422 LockConfig::default(),
423 )
424 .await?;
425 assert!(result.is_some(), "expected lock acquisition");
426 Ok(())
427 }
428
429 #[tokio::test]
430 async fn test_double_lock_same_key_errors() -> Result<()> {
431 let lock_manager = LockManager::new("test_dsn".to_owned());
432
433 let test_id = format!(
434 "test_double_{}",
435 std::time::SystemTime::now()
436 .duration_since(std::time::UNIX_EPOCH)
437 .unwrap()
438 .as_nanos()
439 );
440
441 let guard = lock_manager.lock("test_module", &test_id).await?;
442 let err = lock_manager
443 .lock("test_module", &test_id)
444 .await
445 .unwrap_err();
446 match err {
447 DbLockError::AlreadyHeld { lock_name } => {
448 assert!(lock_name.contains(&test_id));
449 }
450 other => panic!("unexpected error: {other:?}"),
451 }
452
453 guard.release().await;
454 Ok(())
455 }
456
457 #[tokio::test]
458 async fn test_try_lock_conflict_returns_none() -> Result<()> {
459 let lock_manager = LockManager::new("test_dsn".to_owned());
460
461 let key = format!(
462 "test_conflict_{}",
463 std::time::SystemTime::now()
464 .duration_since(std::time::UNIX_EPOCH)
465 .unwrap()
466 .as_nanos()
467 );
468
469 let _guard = lock_manager.lock("module", &key).await?;
470 let config = LockConfig {
471 max_wait: Some(Duration::from_millis(100)),
472 max_attempts: Some(2),
473 ..Default::default()
474 };
475 let res = lock_manager.try_lock("module", &key, config).await?;
476 assert!(res.is_none());
477 Ok(())
478 }
479}