zeph_memory/graph/
entity_lock.rs1use std::time::Duration;
16
17use tokio::time::sleep;
18use zeph_db::{DbPool, query, query_scalar, sql};
19
20use crate::error::MemoryError;
21
22const LOCK_TTL_SECS: i64 = 120;
24
25const MAX_RETRIES: u32 = 3;
27
28const BASE_BACKOFF_MS: u64 = 50;
30
31pub struct EntityLockManager {
33 pool: DbPool,
34 session_id: String,
35}
36
37impl EntityLockManager {
38 #[must_use]
39 pub fn new(pool: DbPool, session_id: impl Into<String>) -> Self {
40 Self {
41 pool,
42 session_id: session_id.into(),
43 }
44 }
45
46 pub async fn try_acquire(&self, entity_name: &str) -> Result<bool, MemoryError> {
59 for attempt in 0..=MAX_RETRIES {
60 match self.try_acquire_once(entity_name).await? {
61 true => return Ok(true),
62 false if attempt == MAX_RETRIES => return Ok(false),
63 false => {
64 let backoff_ms = BASE_BACKOFF_MS * (1u64 << attempt);
65 sleep(Duration::from_millis(backoff_ms)).await;
66 }
67 }
68 }
69 Ok(false)
70 }
71
72 async fn try_acquire_once(&self, entity_name: &str) -> Result<bool, MemoryError> {
73 let acquired: bool = query_scalar(sql!(
80 "INSERT INTO entity_advisory_locks (entity_name, session_id, acquired_at, expires_at)
81 VALUES (?, ?, datetime('now'), datetime('now', ? || ' seconds'))
82 ON CONFLICT(entity_name) DO UPDATE SET
83 session_id = excluded.session_id,
84 acquired_at = excluded.acquired_at,
85 expires_at = excluded.expires_at
86 WHERE
87 -- reclaim if expired
88 entity_advisory_locks.expires_at < datetime('now')
89 OR
90 -- refresh if same session
91 entity_advisory_locks.session_id = excluded.session_id
92 RETURNING (session_id = ?) AS acquired"
93 ))
94 .bind(entity_name)
95 .bind(&self.session_id)
96 .bind(LOCK_TTL_SECS.to_string())
97 .bind(&self.session_id)
98 .fetch_optional(self.pool())
99 .await?
100 .unwrap_or(false);
101
102 Ok(acquired)
103 }
104
105 pub async fn extend_lock(
116 &self,
117 entity_name: &str,
118 extra_secs: i64,
119 ) -> Result<bool, MemoryError> {
120 let affected = query(sql!(
121 "UPDATE entity_advisory_locks
122 SET expires_at = datetime(expires_at, ? || ' seconds')
123 WHERE entity_name = ? AND session_id = ?"
124 ))
125 .bind(extra_secs.to_string())
126 .bind(entity_name)
127 .bind(&self.session_id)
128 .execute(self.pool())
129 .await?
130 .rows_affected();
131
132 Ok(affected > 0)
133 }
134
135 pub async fn release(&self, entity_name: &str) -> Result<(), MemoryError> {
143 query(sql!(
144 "DELETE FROM entity_advisory_locks
145 WHERE entity_name = ? AND session_id = ?"
146 ))
147 .bind(entity_name)
148 .bind(&self.session_id)
149 .execute(self.pool())
150 .await?;
151
152 Ok(())
153 }
154
155 pub async fn release_all(&self) -> Result<(), MemoryError> {
163 query(sql!(
164 "DELETE FROM entity_advisory_locks WHERE session_id = ?"
165 ))
166 .bind(&self.session_id)
167 .execute(self.pool())
168 .await?;
169
170 Ok(())
171 }
172
173 fn pool(&self) -> &DbPool {
174 &self.pool
175 }
176}
177
178#[cfg(test)]
179mod tests {
180 use super::*;
181 use crate::store::DbStore;
182
183 async fn make_lock_manager(session_id: &str) -> EntityLockManager {
184 let store = DbStore::with_pool_size(":memory:", 1)
185 .await
186 .expect("in-memory store");
187 EntityLockManager::new(store.pool().clone(), session_id)
188 }
189
190 async fn make_shared_managers(
191 session_a: &str,
192 session_b: &str,
193 ) -> (EntityLockManager, EntityLockManager) {
194 let store = DbStore::with_pool_size(":memory:", 2)
195 .await
196 .expect("in-memory store");
197 let pool = store.pool().clone();
198 (
199 EntityLockManager::new(pool.clone(), session_a),
200 EntityLockManager::new(pool, session_b),
201 )
202 }
203
204 #[tokio::test]
205 async fn try_acquire_succeeds_on_first_call() {
206 let mgr = make_lock_manager("session-a").await;
207 let acquired = mgr.try_acquire("entity::Foo").await.expect("try_acquire");
208 assert!(acquired);
209 }
210
211 #[tokio::test]
212 async fn try_acquire_same_session_refresh_succeeds() {
213 let mgr = make_lock_manager("session-a").await;
214 assert!(mgr.try_acquire("entity::Foo").await.expect("first"));
215 assert!(mgr.try_acquire("entity::Foo").await.expect("second"));
217 }
218
219 #[tokio::test]
220 async fn try_acquire_fails_when_held_by_different_session() {
221 let (a, b) = make_shared_managers("session-a", "session-b").await;
222 assert!(a.try_acquire("entity::Foo").await.expect("a acquires"));
223 let acquired = b.try_acquire("entity::Foo").await.expect("b tries");
228 assert!(
229 !acquired,
230 "session-b should not acquire a lock held by session-a"
231 );
232 }
233
234 #[tokio::test]
235 async fn expired_lock_is_reclaimed_by_new_session() {
236 let store = DbStore::with_pool_size(":memory:", 2)
237 .await
238 .expect("in-memory store");
239 let pool = store.pool().clone();
240 let b = EntityLockManager::new(pool.clone(), "session-b");
241 zeph_db::query(zeph_db::sql!(
243 "INSERT INTO entity_advisory_locks (entity_name, session_id, acquired_at, expires_at)
244 VALUES ('entity::Bar', 'session-a', datetime('now', '-200 seconds'), datetime('now', '-80 seconds'))"
245 ))
246 .execute(&pool)
247 .await
248 .expect("insert expired lock");
249
250 let acquired = b.try_acquire("entity::Bar").await.expect("try_acquire");
252 assert!(acquired, "session-b should reclaim an expired lock");
253 }
254
255 #[tokio::test]
256 async fn release_clears_the_lock() {
257 let (a, b) = make_shared_managers("session-a", "session-b").await;
258 a.try_acquire("entity::Baz").await.expect("acquire");
259 a.release("entity::Baz").await.expect("release");
260
261 let acquired = b.try_acquire("entity::Baz").await.expect("b reacquire");
263 assert!(acquired);
264 }
265
266 #[tokio::test]
267 async fn release_is_noop_for_wrong_session() {
268 let (a, b) = make_shared_managers("session-a", "session-b").await;
269 assert!(a.try_acquire("entity::Qux").await.expect("a acquires"));
270 b.release("entity::Qux").await.expect("release noop");
272 let acquired = b.try_acquire("entity::Qux").await.expect("b tries");
274 assert!(!acquired);
275 }
276
277 #[tokio::test]
278 async fn release_all_removes_all_session_locks() {
279 let mgr = make_lock_manager("session-a").await;
280 mgr.try_acquire("entity::One").await.expect("one");
281 mgr.try_acquire("entity::Two").await.expect("two");
282 mgr.release_all().await.expect("release_all");
283
284 assert!(mgr.try_acquire("entity::One").await.expect("re-one"));
286 assert!(mgr.try_acquire("entity::Two").await.expect("re-two"));
287 }
288
289 #[tokio::test]
290 async fn extend_lock_returns_true_for_owner() {
291 let mgr = make_lock_manager("session-a").await;
292 mgr.try_acquire("entity::Ext").await.expect("acquire");
293 let extended = mgr.extend_lock("entity::Ext", 60).await.expect("extend");
294 assert!(extended);
295 }
296
297 #[tokio::test]
298 async fn extend_lock_returns_false_for_non_owner() {
299 let (a, b) = make_shared_managers("session-a", "session-b").await;
300 a.try_acquire("entity::Ext2").await.expect("a acquires");
301 let extended = b.extend_lock("entity::Ext2", 60).await.expect("b extend");
302 assert!(
303 !extended,
304 "non-owner session should not be able to extend lock"
305 );
306 }
307}