Skip to main content

reddb_server/auth/
locks.rs

1//! Advisory locks (PG-compatible `pg_advisory_*` family).
2//!
3//! Connection-scoped, reentrant-safe-only-through-explicit-unlock.
4//! Backed by a single global mutex + condition variable — fine for
5//! the embedded/single-node workload RedDB targets today. If it ever
6//! becomes a hotspot, split into a `DashMap<i64, Mutex<_>>`.
7//!
8//! Ownership is tracked by `ConnId` (the same id savepoints use).
9//! `release_all(conn)` drops every lock a connection holds — call it
10//! on connection close so crashed sessions don't wedge other callers
11//! forever.
12
13use parking_lot::{Condvar, Mutex};
14use std::collections::HashMap;
15use std::sync::OnceLock;
16
17pub type ConnId = u64;
18
19/// Process-global table. One per runtime is enough; lock ids are
20/// a shared namespace across connections by PG semantics.
21pub struct AdvisoryLocks {
22    state: Mutex<HashMap<i64, ConnId>>,
23    cv: Condvar,
24}
25
26impl AdvisoryLocks {
27    fn new() -> Self {
28        Self {
29            state: Mutex::new(HashMap::new()),
30            cv: Condvar::new(),
31        }
32    }
33
34    /// Try to acquire `key` for `conn`. Returns `true` if the caller
35    /// now owns it (either fresh acquire or already holds it —
36    /// reentrant within the same connection, matching PG). Never
37    /// blocks.
38    pub fn try_acquire(&self, key: i64, conn: ConnId) -> bool {
39        let mut map = self.state.lock();
40        match map.get(&key).copied() {
41            Some(owner) if owner == conn => true,
42            Some(_) => false,
43            None => {
44                map.insert(key, conn);
45                true
46            }
47        }
48    }
49
50    /// Acquire `key` for `conn`, blocking until the current owner
51    /// (if any) releases. Reentrant within the same connection.
52    pub fn acquire(&self, key: i64, conn: ConnId) {
53        let mut map = self.state.lock();
54        loop {
55            match map.get(&key).copied() {
56                Some(owner) if owner == conn => return,
57                Some(_) => self.cv.wait(&mut map),
58                None => {
59                    map.insert(key, conn);
60                    return;
61                }
62            }
63        }
64    }
65
66    /// Release `key` for `conn`. Returns `true` if `conn` held the
67    /// lock. A mismatch returns `false` and leaves the table
68    /// untouched (PG behaviour: `pg_advisory_unlock` returns bool
69    /// without panicking on foreign locks).
70    pub fn release(&self, key: i64, conn: ConnId) -> bool {
71        let mut map = self.state.lock();
72        match map.get(&key).copied() {
73            Some(owner) if owner == conn => {
74                map.remove(&key);
75                self.cv.notify_all();
76                true
77            }
78            _ => false,
79        }
80    }
81
82    /// Release every lock held by `conn`. Returns the number of
83    /// locks dropped. Call this on connection close.
84    pub fn release_all(&self, conn: ConnId) -> usize {
85        let mut map = self.state.lock();
86        let before = map.len();
87        map.retain(|_, owner| *owner != conn);
88        let dropped = before - map.len();
89        if dropped > 0 {
90            self.cv.notify_all();
91        }
92        dropped
93    }
94
95    /// Test-visible: is `key` currently held (by anyone)?
96    #[cfg(test)]
97    pub fn is_held(&self, key: i64) -> bool {
98        self.state.lock().contains_key(&key)
99    }
100}
101
102static GLOBAL: OnceLock<AdvisoryLocks> = OnceLock::new();
103
104/// Process-wide singleton accessor. Lazy-init on first call.
105pub fn global() -> &'static AdvisoryLocks {
106    GLOBAL.get_or_init(AdvisoryLocks::new)
107}
108
109#[cfg(test)]
110mod tests {
111    use super::*;
112
113    #[test]
114    fn try_acquire_and_release() {
115        let locks = AdvisoryLocks::new();
116        assert!(locks.try_acquire(1, 100));
117        assert!(!locks.try_acquire(1, 200), "other conn cannot steal");
118        assert!(locks.try_acquire(1, 100), "same conn is reentrant");
119        assert!(locks.release(1, 100));
120        assert!(!locks.is_held(1));
121    }
122
123    #[test]
124    fn release_all_drops_only_owned() {
125        let locks = AdvisoryLocks::new();
126        assert!(locks.try_acquire(1, 100));
127        assert!(locks.try_acquire(2, 100));
128        assert!(locks.try_acquire(3, 200));
129        assert_eq!(locks.release_all(100), 2);
130        assert!(!locks.is_held(1));
131        assert!(!locks.is_held(2));
132        assert!(locks.is_held(3));
133    }
134
135    #[test]
136    fn release_mismatch_returns_false() {
137        let locks = AdvisoryLocks::new();
138        assert!(locks.try_acquire(5, 100));
139        assert!(!locks.release(5, 999));
140        assert!(locks.is_held(5));
141    }
142}