Skip to main content

cognee_session/
improve_lock.rs

1//! Per-session improve-lock registry.
2//!
3//! Mirrors Python's `cognee.infrastructure.locks.session_lock` module:
4//! a non-blocking claim for long-running `improve()` calls. The registry
5//! is a process-global `HashSet` guarded by a sync `Mutex` so the
6//! check-and-add happens atomically.
7//!
8//! Scope: single-process (matches Python's default single-worker FastAPI
9//! model). For multi-process deployments, layer a distributed lock on top —
10//! the call sites are factored so that is a local change.
11
12use std::collections::HashSet;
13use std::sync::{Mutex, OnceLock};
14
15static IMPROVING: OnceLock<Mutex<HashSet<String>>> = OnceLock::new();
16
17fn registry() -> &'static Mutex<HashSet<String>> {
18    IMPROVING.get_or_init(|| Mutex::new(HashSet::new()))
19}
20
21/// Atomically claim the improve-lock for `session_id`.
22///
23/// Returns `true` iff we got it (i.e. no other caller is currently
24/// improving this session). Empty `session_id` is a no-op — always
25/// returns `true` (matches Python's `if not session_id: return True`).
26/// Caller MUST release the lock when done; prefer [`ImproveLockGuard`].
27pub fn try_acquire_improve_lock(session_id: &str) -> bool {
28    if session_id.is_empty() {
29        return true;
30    }
31    // lock poison is unrecoverable
32    #[allow(clippy::unwrap_used, reason = "lock poison is unrecoverable")]
33    let mut set = registry().lock().unwrap();
34    set.insert(session_id.to_string())
35}
36
37/// Release the improve-lock for `session_id`. Idempotent.
38///
39/// Empty `session_id` is a no-op (mirrors Python's early-return).
40pub fn release_improve_lock(session_id: &str) {
41    if session_id.is_empty() {
42        return;
43    }
44    // lock poison is unrecoverable
45    #[allow(clippy::unwrap_used, reason = "lock poison is unrecoverable")]
46    registry().lock().unwrap().remove(session_id);
47}
48
49/// RAII guard that releases the improve-lock on drop.
50///
51/// Use [`ImproveLockGuard::acquire`] to claim and wrap the lock so that
52/// any early return or panic automatically releases it (matches Python's
53/// `try/finally`).
54///
55/// The guard stores a `String`, not a `MutexGuard`, so it is `Send` and
56/// can safely be held across `.await` points.
57pub struct ImproveLockGuard(Option<String>);
58
59impl ImproveLockGuard {
60    /// Attempt to acquire the lock for `session_id`.
61    ///
62    /// Returns `Some(guard)` if the lock was acquired, `None` if another
63    /// task already holds it.
64    pub fn acquire(session_id: &str) -> Option<Self> {
65        if try_acquire_improve_lock(session_id) {
66            Some(Self(Some(session_id.to_string())))
67        } else {
68            None
69        }
70    }
71}
72
73impl Drop for ImproveLockGuard {
74    fn drop(&mut self) {
75        if let Some(ref s) = self.0.take() {
76            release_improve_lock(s);
77        }
78    }
79}
80
81#[cfg(test)]
82#[allow(
83    clippy::unwrap_used,
84    clippy::expect_used,
85    reason = "test code — panics are acceptable failures"
86)]
87mod tests {
88    use super::*;
89
90    #[test]
91    fn empty_session_id_always_acquires() {
92        assert!(try_acquire_improve_lock(""));
93        assert!(try_acquire_improve_lock(""));
94        // releasing an empty id is a no-op and should not panic
95        release_improve_lock("");
96    }
97
98    #[test]
99    fn second_acquire_returns_false() {
100        let sid = format!("test-lock-second-{}", uuid::Uuid::new_v4());
101        assert!(try_acquire_improve_lock(&sid), "first acquire must succeed");
102        assert!(
103            !try_acquire_improve_lock(&sid),
104            "second acquire must fail while first is held"
105        );
106        release_improve_lock(&sid);
107    }
108
109    #[test]
110    fn improve_lock_excludes_concurrent() {
111        let sid = format!("test-lock-excl-{}", uuid::Uuid::new_v4());
112
113        // First acquire succeeds.
114        assert!(try_acquire_improve_lock(&sid));
115        // Second acquire fails.
116        assert!(!try_acquire_improve_lock(&sid));
117        // After release, acquire succeeds again.
118        release_improve_lock(&sid);
119        assert!(try_acquire_improve_lock(&sid));
120        // Cleanup.
121        release_improve_lock(&sid);
122    }
123
124    #[test]
125    fn guard_releases_on_drop() {
126        let sid = format!("test-lock-guard-{}", uuid::Uuid::new_v4());
127        {
128            let guard = ImproveLockGuard::acquire(&sid);
129            assert!(guard.is_some(), "guard must be acquired");
130            // While guard is held, a second acquire must fail.
131            assert!(!try_acquire_improve_lock(&sid));
132        } // guard drops here
133        // After drop, acquire must succeed again.
134        assert!(try_acquire_improve_lock(&sid));
135        release_improve_lock(&sid);
136    }
137
138    #[test]
139    fn guard_acquire_fails_when_held() {
140        let sid = format!("test-lock-guard-fail-{}", uuid::Uuid::new_v4());
141        let _g1 = ImproveLockGuard::acquire(&sid).expect("first guard");
142        let g2 = ImproveLockGuard::acquire(&sid);
143        assert!(g2.is_none(), "second guard must not be acquired");
144    }
145
146    #[test]
147    fn empty_session_id_guard_always_acquires() {
148        let g1 = ImproveLockGuard::acquire("");
149        assert!(g1.is_some());
150        // A second guard on "" must also return Some (no-op semantics)
151        let g2 = ImproveLockGuard::acquire("");
152        assert!(g2.is_some());
153    }
154}