Skip to main content

zeph_memory/graph/
entity_lock.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Advisory entity locking for multi-agent `GraphStore` coordination (#2478).
5//!
6//! `SQLite` does not provide row-level locks. This module implements a soft advisory
7//! locking pattern using a dedicated `entity_advisory_locks` table. Locks are
8//! automatically expired after 120 seconds (covers worst-case slow LLM calls).
9//!
10//! Expired locks are reclaimed on the next `try_acquire` call rather than via a
11//! cleanup sweep. When a lock is reclaimed by another session, the original holder's
12//! subsequent writes follow last-writer-wins semantics — acceptable for entity
13//! resolution where duplicate entities can be merged in a later consolidation sweep.
14
15use std::time::Duration;
16
17use tokio::time::sleep;
18use zeph_db::{DbPool, query, query_scalar, sql};
19
20use crate::error::MemoryError;
21
22/// TTL for advisory locks in seconds. Must exceed the worst-case LLM call latency.
23const LOCK_TTL_SECS: i64 = 120;
24
25/// Maximum retry attempts when a lock is held by another session.
26const MAX_RETRIES: u32 = 3;
27
28/// Base backoff duration for lock acquisition retries.
29const BASE_BACKOFF_MS: u64 = 50;
30
31/// Advisory entity lock manager for a single session.
32pub struct EntityLockManager {
33    pool: DbPool,
34    session_id: String,
35}
36
37impl EntityLockManager {
38    #[must_use]
39    pub fn new(pool: DbPool, session_id: impl Into<String>) -> Self {
40        Self {
41            pool,
42            session_id: session_id.into(),
43        }
44    }
45
46    /// Try to acquire an advisory lock on `entity_name`.
47    ///
48    /// - If no lock exists: INSERT and return `true`.
49    /// - If the current session already holds the lock: UPDATE `expires_at`, return `true`.
50    /// - If another session holds a non-expired lock: retry with exponential backoff.
51    /// - After `MAX_RETRIES` failures: return `false` (caller proceeds without lock).
52    ///
53    /// Expired locks (past `expires_at`) are atomically reclaimed on the INSERT conflict.
54    ///
55    /// # Errors
56    ///
57    /// Returns an error on database failures.
58    pub async fn try_acquire(&self, entity_name: &str) -> Result<bool, MemoryError> {
59        for attempt in 0..=MAX_RETRIES {
60            match self.try_acquire_once(entity_name).await? {
61                true => return Ok(true),
62                false if attempt == MAX_RETRIES => return Ok(false),
63                false => {
64                    let backoff_ms = BASE_BACKOFF_MS * (1u64 << attempt);
65                    sleep(Duration::from_millis(backoff_ms)).await;
66                }
67            }
68        }
69        Ok(false)
70    }
71
72    async fn try_acquire_once(&self, entity_name: &str) -> Result<bool, MemoryError> {
73        // INSERT OR IGNORE: succeeds if no row exists.
74        // Then UPDATE: refreshes the lock if held by this session OR if it has expired.
75        // A single round-trip via RETURNING id would be nicer but the expired-or-same-session
76        // condition requires a WHERE clause that INSERT OR IGNORE cannot express.
77        // We use a two-statement approach in a transaction for atomicity.
78
79        let acquired: bool = query_scalar(sql!(
80            "INSERT INTO entity_advisory_locks (entity_name, session_id, acquired_at, expires_at)
81             VALUES (?, ?, datetime('now'), datetime('now', ? || ' seconds'))
82             ON CONFLICT(entity_name) DO UPDATE SET
83                 session_id  = excluded.session_id,
84                 acquired_at = excluded.acquired_at,
85                 expires_at  = excluded.expires_at
86             WHERE
87                 -- reclaim if expired
88                 entity_advisory_locks.expires_at < datetime('now')
89                 OR
90                 -- refresh if same session
91                 entity_advisory_locks.session_id = excluded.session_id
92             RETURNING (session_id = ?) AS acquired"
93        ))
94        .bind(entity_name)
95        .bind(&self.session_id)
96        .bind(LOCK_TTL_SECS.to_string())
97        .bind(&self.session_id)
98        .fetch_optional(self.pool())
99        .await?
100        .unwrap_or(false);
101
102        Ok(acquired)
103    }
104
105    /// Extend the TTL of a lock held by this session.
106    ///
107    /// Called before long operations (e.g., an LLM call inside entity resolution)
108    /// to prevent the lock from expiring while work is in progress.
109    ///
110    /// Returns `true` if the lock was extended (still held by this session).
111    ///
112    /// # Errors
113    ///
114    /// Returns an error on database failures.
115    pub async fn extend_lock(
116        &self,
117        entity_name: &str,
118        extra_secs: i64,
119    ) -> Result<bool, MemoryError> {
120        let affected = query(sql!(
121            "UPDATE entity_advisory_locks
122             SET expires_at = datetime(expires_at, ? || ' seconds')
123             WHERE entity_name = ? AND session_id = ?"
124        ))
125        .bind(extra_secs.to_string())
126        .bind(entity_name)
127        .bind(&self.session_id)
128        .execute(self.pool())
129        .await?
130        .rows_affected();
131
132        Ok(affected > 0)
133    }
134
135    /// Release the lock on `entity_name` held by this session.
136    ///
137    /// No-op if the lock was already reclaimed by another session.
138    ///
139    /// # Errors
140    ///
141    /// Returns an error on database failures.
142    pub async fn release(&self, entity_name: &str) -> Result<(), MemoryError> {
143        query(sql!(
144            "DELETE FROM entity_advisory_locks
145             WHERE entity_name = ? AND session_id = ?"
146        ))
147        .bind(entity_name)
148        .bind(&self.session_id)
149        .execute(self.pool())
150        .await?;
151
152        Ok(())
153    }
154
155    /// Release all locks held by this session.
156    ///
157    /// Called on agent shutdown to avoid leaving locks until TTL expiry.
158    ///
159    /// # Errors
160    ///
161    /// Returns an error on database failures.
162    pub async fn release_all(&self) -> Result<(), MemoryError> {
163        query(sql!(
164            "DELETE FROM entity_advisory_locks WHERE session_id = ?"
165        ))
166        .bind(&self.session_id)
167        .execute(self.pool())
168        .await?;
169
170        Ok(())
171    }
172
173    fn pool(&self) -> &DbPool {
174        &self.pool
175    }
176}
177
178#[cfg(test)]
179mod tests {
180    use super::*;
181    use crate::store::DbStore;
182
183    async fn make_lock_manager(session_id: &str) -> EntityLockManager {
184        let store = DbStore::with_pool_size(":memory:", 1)
185            .await
186            .expect("in-memory store");
187        EntityLockManager::new(store.pool().clone(), session_id)
188    }
189
190    async fn make_shared_managers(
191        session_a: &str,
192        session_b: &str,
193    ) -> (EntityLockManager, EntityLockManager) {
194        let store = DbStore::with_pool_size(":memory:", 2)
195            .await
196            .expect("in-memory store");
197        let pool = store.pool().clone();
198        (
199            EntityLockManager::new(pool.clone(), session_a),
200            EntityLockManager::new(pool, session_b),
201        )
202    }
203
204    #[tokio::test]
205    async fn try_acquire_succeeds_on_first_call() {
206        let mgr = make_lock_manager("session-a").await;
207        let acquired = mgr.try_acquire("entity::Foo").await.expect("try_acquire");
208        assert!(acquired);
209    }
210
211    #[tokio::test]
212    async fn try_acquire_same_session_refresh_succeeds() {
213        let mgr = make_lock_manager("session-a").await;
214        assert!(mgr.try_acquire("entity::Foo").await.expect("first"));
215        // Same session — should refresh and return true immediately.
216        assert!(mgr.try_acquire("entity::Foo").await.expect("second"));
217    }
218
219    #[tokio::test]
220    async fn try_acquire_fails_when_held_by_different_session() {
221        let (a, b) = make_shared_managers("session-a", "session-b").await;
222        assert!(a.try_acquire("entity::Foo").await.expect("a acquires"));
223        // Session B cannot acquire the same entity (will exhaust retries).
224        // We use try_acquire_once directly via a fresh lock on an entity no one holds first,
225        // then test contention by calling the public API.
226        // Since MAX_RETRIES=3 with backoff, this adds ~350ms per test. Acceptable.
227        let acquired = b.try_acquire("entity::Foo").await.expect("b tries");
228        assert!(
229            !acquired,
230            "session-b should not acquire a lock held by session-a"
231        );
232    }
233
234    #[tokio::test]
235    async fn expired_lock_is_reclaimed_by_new_session() {
236        let store = DbStore::with_pool_size(":memory:", 2)
237            .await
238            .expect("in-memory store");
239        let pool = store.pool().clone();
240        let b = EntityLockManager::new(pool.clone(), "session-b");
241        // Insert an already-expired lock directly into the table.
242        zeph_db::query(zeph_db::sql!(
243            "INSERT INTO entity_advisory_locks (entity_name, session_id, acquired_at, expires_at)
244             VALUES ('entity::Bar', 'session-a', datetime('now', '-200 seconds'), datetime('now', '-80 seconds'))"
245        ))
246        .execute(&pool)
247        .await
248        .expect("insert expired lock");
249
250        // Session B should reclaim the expired lock.
251        let acquired = b.try_acquire("entity::Bar").await.expect("try_acquire");
252        assert!(acquired, "session-b should reclaim an expired lock");
253    }
254
255    #[tokio::test]
256    async fn release_clears_the_lock() {
257        let (a, b) = make_shared_managers("session-a", "session-b").await;
258        a.try_acquire("entity::Baz").await.expect("acquire");
259        a.release("entity::Baz").await.expect("release");
260
261        // After release, a different session can immediately acquire (no retries needed).
262        let acquired = b.try_acquire("entity::Baz").await.expect("b reacquire");
263        assert!(acquired);
264    }
265
266    #[tokio::test]
267    async fn release_is_noop_for_wrong_session() {
268        let (a, b) = make_shared_managers("session-a", "session-b").await;
269        assert!(a.try_acquire("entity::Qux").await.expect("a acquires"));
270        // Session B releasing a lock it doesn't hold: should be a no-op.
271        b.release("entity::Qux").await.expect("release noop");
272        // Session A still holds the lock — B cannot acquire.
273        let acquired = b.try_acquire("entity::Qux").await.expect("b tries");
274        assert!(!acquired);
275    }
276
277    #[tokio::test]
278    async fn release_all_removes_all_session_locks() {
279        let mgr = make_lock_manager("session-a").await;
280        mgr.try_acquire("entity::One").await.expect("one");
281        mgr.try_acquire("entity::Two").await.expect("two");
282        mgr.release_all().await.expect("release_all");
283
284        // Both locks removed — can re-acquire immediately.
285        assert!(mgr.try_acquire("entity::One").await.expect("re-one"));
286        assert!(mgr.try_acquire("entity::Two").await.expect("re-two"));
287    }
288
289    #[tokio::test]
290    async fn extend_lock_returns_true_for_owner() {
291        let mgr = make_lock_manager("session-a").await;
292        mgr.try_acquire("entity::Ext").await.expect("acquire");
293        let extended = mgr.extend_lock("entity::Ext", 60).await.expect("extend");
294        assert!(extended);
295    }
296
297    #[tokio::test]
298    async fn extend_lock_returns_false_for_non_owner() {
299        let (a, b) = make_shared_managers("session-a", "session-b").await;
300        a.try_acquire("entity::Ext2").await.expect("a acquires");
301        let extended = b.extend_lock("entity::Ext2", 60).await.expect("b extend");
302        assert!(
303            !extended,
304            "non-owner session should not be able to extend lock"
305        );
306    }
307}