Skip to main content

modkit_db/
advisory_locks.rs

1//! Advisory locks implementation with namespacing and retry policies.
2//!
3//! Cross-database advisory locking with proper namespacing and configurable
4//! retry/backoff.
5//!
6//! ## Security policy
7//! This crate forbids plain SQL outside migration infrastructure. Therefore, locks
8//! are implemented **purely as file-based locks** (no DB-native advisory locks).
9//!
10//! Notes:
11//! - Prefer calling `guard.release().await` for deterministic unlock;
12//!   `Drop` provides best-effort cleanup only (may be skipped on runtime shutdown).
13//! - File-based locks use `create_new(true)` semantics and keep the file open,
14//!   then remove it on release. Consider using `fs2::FileExt::try_lock_exclusive()`
15//!   if you want kernel-level advisory locks across processes.
16
17#![cfg_attr(
18    not(any(feature = "pg", feature = "mysql", feature = "sqlite")),
19    allow(unused_imports, unused_variables, dead_code, unreachable_code)
20)]
21
22use std::path::PathBuf;
23use std::time::{Duration, Instant};
24use thiserror::Error;
25use xxhash_rust::xxh3::xxh3_64;
26
27use chrono::SecondsFormat;
28
29use tokio::fs::File;
30
31// --------------------------- Config ------------------------------------------
32
33/// Configuration for lock acquisition attempts.
34#[derive(Debug, Clone)]
35pub struct LockConfig {
36    /// Maximum duration to wait for lock acquisition (`None` = unlimited).
37    pub max_wait: Option<Duration>,
38    /// Initial delay between retry attempts.
39    pub initial_backoff: Duration,
40    /// Maximum delay between retry attempts (cap for exponential backoff).
41    pub max_backoff: Duration,
42    /// Backoff multiplier for exponential backoff.
43    pub backoff_multiplier: f64,
44    /// Jitter percentage in [0.0, 1.0]; e.g. 0.2 means ±20% jitter.
45    pub jitter_pct: f32,
46    /// Maximum number of retry attempts (`None` = unlimited).
47    pub max_attempts: Option<u32>,
48}
49
50impl Default for LockConfig {
51    fn default() -> Self {
52        Self {
53            max_wait: Some(Duration::from_secs(30)),
54            initial_backoff: Duration::from_millis(50),
55            max_backoff: Duration::from_secs(5),
56            backoff_multiplier: 1.5,
57            jitter_pct: 0.2,
58            max_attempts: None,
59        }
60    }
61}
62
63/* --------------------------- Guard ------------------------------------------- */
64
65#[derive(Debug)]
66enum GuardInner {
67    /// File-based fallback (keeps descriptor open until release).
68    File { path: PathBuf, file: File },
69}
70
71/// Database lock guard that can release lock explicitly via `release()`.
72/// `Drop` provides best-effort cleanup if you forget to call `release()`.
73#[derive(Debug)]
74pub struct DbLockGuard {
75    namespaced_key: String,
76    inner: Option<GuardInner>, // Option to allow moving inner out in Drop
77}
78
79impl DbLockGuard {
80    /// Lock key with module namespace ("module:key").
81    pub fn key(&self) -> &str {
82        &self.namespaced_key
83    }
84
85    /// Deterministically release the lock (preferred).
86    pub async fn release(mut self) {
87        if let Some(inner) = self.inner.take() {
88            unlock_inner(inner).await;
89        }
90        // drop self
91    }
92}
93
94impl Drop for DbLockGuard {
95    fn drop(&mut self) {
96        // Best-effort async unlock if runtime is alive and inner still present.
97        if let Some(inner) = self.inner.take()
98            && let Ok(handle) = tokio::runtime::Handle::try_current()
99        {
100            handle.spawn(async move { unlock_inner(inner).await });
101        }
102        // else: No runtime or no inner; we cannot perform async cleanup here.
103        // The lock may remain held until process exit (DB connection)
104        // or lock file may remain on disk. Prefer calling `release().await`.
105    }
106}
107
108async fn unlock_inner(inner: GuardInner) {
109    match inner {
110        GuardInner::File { path, file } => {
111            // Close file first, then try to remove marker. Ignore errors.
112            drop(file);
113            _ = tokio::fs::remove_file(&path).await;
114        }
115    }
116}
117
118// --------------------------- Lock Manager ------------------------------------
119
120/// Internal lock manager handling different database backends.
121pub(crate) struct LockManager {
122    dsn: String,
123}
124
125impl LockManager {
126    #[must_use]
127    pub fn new(dsn: String) -> Self {
128        Self { dsn }
129    }
130
131    /// Acquire an advisory lock for `{module}:{key}`.
132    ///
133    /// Returns a guard that releases the lock when dropped (best-effort) or
134    /// deterministically when `release().await` is called.
135    ///
136    /// # Errors
137    /// Returns `DbLockError` if the lock cannot be acquired.
138    pub async fn lock(&self, module: &str, key: &str) -> Result<DbLockGuard, DbLockError> {
139        let namespaced_key = format!("{module}:{key}");
140        self.lock_file(&namespaced_key).await
141    }
142
143    /// Try to acquire an advisory lock with retry/backoff policy.
144    ///
145    /// Returns:
146    /// - `Ok(Some(guard))` if lock acquired
147    /// - `Ok(None)` if timed out or attempts exceeded
148    /// - `Err(e)` on unrecoverable error
149    ///
150    /// # Errors
151    /// Returns `DbLockError` on unrecoverable lock errors.
152    pub async fn try_lock(
153        &self,
154        module: &str,
155        key: &str,
156        config: LockConfig,
157    ) -> Result<Option<DbLockGuard>, DbLockError> {
158        let namespaced_key = format!("{module}:{key}");
159        let start = Instant::now();
160        let mut attempt = 0u32;
161        let mut backoff = config.initial_backoff;
162
163        loop {
164            attempt += 1;
165
166            if let Some(max_attempts) = config.max_attempts
167                && attempt > max_attempts
168            {
169                return Ok(None);
170            }
171            if let Some(max_wait) = config.max_wait
172                && start.elapsed() >= max_wait
173            {
174                return Ok(None);
175            }
176
177            if let Some(guard) = self.try_acquire_once(&namespaced_key).await? {
178                return Ok(Some(guard));
179            }
180
181            // Sleep with jitter, capped by remaining time if any.
182            let remaining = config
183                .max_wait
184                .map_or(backoff, |mw| mw.saturating_sub(start.elapsed()));
185
186            if remaining.is_zero() {
187                return Ok(None);
188            }
189
190            #[allow(clippy::cast_precision_loss)]
191            let jitter_factor = {
192                let pct = f64::from(config.jitter_pct.clamp(0.0, 1.0));
193                let lo = 1.0 - pct;
194                let hi = 1.0 + pct;
195                // Deterministic jitter from key hash (no rand dep).
196                let h = xxh3_64(namespaced_key.as_bytes()) as f64;
197                let frac = h / u64::MAX as f64; // 0..1
198                lo + frac * (hi - lo)
199            };
200
201            let sleep_for = std::cmp::min(backoff, remaining);
202            tokio::time::sleep(sleep_for.mul_f64(jitter_factor)).await;
203
204            // Exponential backoff
205            let next = backoff.mul_f64(config.backoff_multiplier);
206            backoff = std::cmp::min(next, config.max_backoff);
207        }
208    }
209
210    // ------------------------ File helpers ----------------------
211
212    async fn lock_file(&self, namespaced_key: &str) -> Result<DbLockGuard, DbLockError> {
213        let path = self.get_lock_file_path(namespaced_key);
214        if let Some(parent) = path.parent() {
215            tokio::fs::create_dir_all(parent).await?;
216        }
217
218        // create_new semantics via tokio
219        let file_res = tokio::fs::OpenOptions::new()
220            .write(true)
221            .create_new(true)
222            .open(&path)
223            .await;
224        let file = match file_res {
225            Ok(f) => f,
226            Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
227                return Err(DbLockError::AlreadyHeld {
228                    lock_name: namespaced_key.to_owned(),
229                });
230            }
231            Err(e) => return Err(e.into()),
232        };
233
234        // Write debug info (best-effort only)
235        {
236            use tokio::io::AsyncWriteExt;
237            let mut f = file.try_clone().await?;
238            _ = f
239                .write_all(
240                    format!(
241                        "PID: {}\nKey: {}\nTimestamp: {}\n",
242                        std::process::id(),
243                        namespaced_key,
244                        chrono::Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true)
245                    )
246                    .as_bytes(),
247                )
248                .await;
249        }
250
251        Ok(DbLockGuard {
252            namespaced_key: namespaced_key.to_owned(),
253            inner: Some(GuardInner::File { path, file }),
254        })
255    }
256
257    async fn try_lock_file(
258        &self,
259        namespaced_key: &str,
260    ) -> Result<Option<DbLockGuard>, DbLockError> {
261        let path = self.get_lock_file_path(namespaced_key);
262        if let Some(parent) = path.parent() {
263            tokio::fs::create_dir_all(parent).await?;
264        }
265
266        match tokio::fs::OpenOptions::new()
267            .write(true)
268            .create_new(true)
269            .open(&path)
270            .await
271        {
272            Ok(file) => {
273                // Write debug info (best-effort only)
274                {
275                    use tokio::io::AsyncWriteExt;
276                    let mut f = file.try_clone().await?;
277                    _ = f
278                        .write_all(
279                            format!(
280                                "PID: {}\nKey: {}\nTimestamp: {}\n",
281                                std::process::id(),
282                                namespaced_key,
283                                chrono::Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true)
284                            )
285                            .as_bytes(),
286                        )
287                        .await;
288                }
289
290                Ok(Some(DbLockGuard {
291                    namespaced_key: namespaced_key.to_owned(),
292                    inner: Some(GuardInner::File { path, file }),
293                }))
294            }
295            Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => Ok(None),
296            Err(e) => Err(e.into()),
297        }
298    }
299
300    async fn try_acquire_once(
301        &self,
302        namespaced_key: &str,
303    ) -> Result<Option<DbLockGuard>, DbLockError> {
304        self.try_lock_file(namespaced_key).await
305    }
306
307    /// Generate lock file path for `SQLite` (or when using file-based locks).
308    fn get_lock_file_path(&self, namespaced_key: &str) -> PathBuf {
309        // For ephemeral DSNs (like `memdb`) or tests, use temp dir to avoid global pollution.
310        let base_dir = if self.dsn.contains("memdb") || cfg!(test) {
311            std::env::temp_dir().join("hyperspot_test_locks")
312        } else {
313            // Prefer OS cache dir; fallback to temp dir if None (e.g. in minimal containers).
314            let cache = dirs::cache_dir().unwrap_or_else(std::env::temp_dir);
315            cache.join("hyperspot").join("locks")
316        };
317
318        let dsn_hash = format!("{:x}", xxh3_64(self.dsn.as_bytes()));
319        let key_hash = format!("{:x}", xxh3_64(namespaced_key.as_bytes()));
320        base_dir.join(dsn_hash).join(format!("{key_hash}.lock"))
321    }
322}
323
324// --------------------------- Errors ------------------------------------------
325
326#[derive(Error, Debug)]
327pub enum DbLockError {
328    #[error("I/O error: {0}")]
329    Io(#[from] std::io::Error),
330
331    #[error("Lock already held: {lock_name}")]
332    AlreadyHeld { lock_name: String },
333
334    #[error("Lock not found: {lock_name}")]
335    NotFound { lock_name: String },
336}
337
338// --------------------------- Tests -------------------------------------------
339
340#[cfg(test)]
341#[cfg_attr(coverage_nightly, coverage(off))]
342mod tests {
343    use super::*;
344    use anyhow::Result;
345    use std::sync::Arc;
346
347    #[tokio::test]
348    async fn test_namespaced_locks() -> Result<()> {
349        let lock_manager = LockManager::new("test_dsn".to_owned());
350
351        // Unique key suffix (avoid conflicts in parallel)
352        let test_id = format!(
353            "test_ns_{}",
354            std::time::SystemTime::now()
355                .duration_since(std::time::UNIX_EPOCH)
356                .unwrap()
357                .as_nanos()
358        );
359
360        let guard1 = lock_manager
361            .lock("module1", &format!("{test_id}_key"))
362            .await?;
363        let guard2 = lock_manager
364            .lock("module2", &format!("{test_id}_key"))
365            .await?;
366
367        assert!(!guard1.key().is_empty());
368        assert!(!guard2.key().is_empty());
369
370        guard1.release().await;
371        guard2.release().await;
372        Ok(())
373    }
374
375    #[tokio::test]
376    async fn test_try_lock_with_timeout() -> Result<()> {
377        let lock_manager = Arc::new(LockManager::new("test_dsn".to_owned()));
378
379        let test_id = format!(
380            "test_timeout_{}",
381            std::time::SystemTime::now()
382                .duration_since(std::time::UNIX_EPOCH)
383                .unwrap()
384                .as_nanos()
385        );
386
387        let _guard1 = lock_manager
388            .lock("test_module", &format!("{test_id}_key"))
389            .await?;
390
391        // Different key should succeed quickly even with retries/timeouts
392        let config = LockConfig {
393            max_wait: Some(Duration::from_millis(200)),
394            initial_backoff: Duration::from_millis(50),
395            max_attempts: Some(3),
396            ..Default::default()
397        };
398
399        let result = lock_manager
400            .try_lock("test_module", &format!("{test_id}_different_key"), config)
401            .await?;
402        assert!(result.is_some(), "expected successful lock acquisition");
403        Ok(())
404    }
405
406    #[tokio::test]
407    async fn test_try_lock_success() -> Result<()> {
408        let lock_manager = LockManager::new("test_dsn".to_owned());
409
410        let test_id = format!(
411            "test_success_{}",
412            std::time::SystemTime::now()
413                .duration_since(std::time::UNIX_EPOCH)
414                .unwrap()
415                .as_nanos()
416        );
417
418        let result = lock_manager
419            .try_lock(
420                "test_module",
421                &format!("{test_id}_key"),
422                LockConfig::default(),
423            )
424            .await?;
425        assert!(result.is_some(), "expected lock acquisition");
426        Ok(())
427    }
428
429    #[tokio::test]
430    async fn test_double_lock_same_key_errors() -> Result<()> {
431        let lock_manager = LockManager::new("test_dsn".to_owned());
432
433        let test_id = format!(
434            "test_double_{}",
435            std::time::SystemTime::now()
436                .duration_since(std::time::UNIX_EPOCH)
437                .unwrap()
438                .as_nanos()
439        );
440
441        let guard = lock_manager.lock("test_module", &test_id).await?;
442        let err = lock_manager
443            .lock("test_module", &test_id)
444            .await
445            .unwrap_err();
446        match err {
447            DbLockError::AlreadyHeld { lock_name } => {
448                assert!(lock_name.contains(&test_id));
449            }
450            other => panic!("unexpected error: {other:?}"),
451        }
452
453        guard.release().await;
454        Ok(())
455    }
456
457    #[tokio::test]
458    async fn test_try_lock_conflict_returns_none() -> Result<()> {
459        let lock_manager = LockManager::new("test_dsn".to_owned());
460
461        let key = format!(
462            "test_conflict_{}",
463            std::time::SystemTime::now()
464                .duration_since(std::time::UNIX_EPOCH)
465                .unwrap()
466                .as_nanos()
467        );
468
469        let _guard = lock_manager.lock("module", &key).await?;
470        let config = LockConfig {
471            max_wait: Some(Duration::from_millis(100)),
472            max_attempts: Some(2),
473            ..Default::default()
474        };
475        let res = lock_manager.try_lock("module", &key, config).await?;
476        assert!(res.is_none());
477        Ok(())
478    }
479}