entelix_persistence/lock.rs
1//! Distributed lock primitives.
2//!
3//! [`DistributedLock`] is the backend-agnostic trait; concrete impls
4//! live in the `postgres` (advisory locks) and `redis` (`SET NX PX` +
5//! Lua release script) modules — each gated behind its corresponding
6//! feature flag. [`with_session_lock`] composes the trait with
7//! [`crate::AdvisoryKey::for_session`] so the canonical
8//! `(tenant_id, thread_id)` lock-key derivation is the only thing
9//! callers need to know about.
10
11use std::time::{Duration, Instant};
12
13use async_trait::async_trait;
14use uuid::Uuid;
15
16use crate::advisory_key::AdvisoryKey;
17use crate::error::{PersistenceError, PersistenceResult};
18
19/// Acquire-then-release primitive over a distributed key.
20///
21/// Implementors are responsible for:
22/// - exclusive acquisition by `key`
23/// - per-acquire token issuance so [`Self::release`] is idempotent
24/// even if the same key is held by a later attempt
25/// - TTL enforcement so a crashed holder doesn't deadlock
26/// indefinitely
27///
28/// Cancellation: implementors honour the ambient cancellation token
29/// propagated via [`entelix_core::ExecutionContext`] when one is in
30/// scope. The trait itself does not take a context —
31/// [`with_session_lock`] is the composition point.
32#[async_trait]
33pub trait DistributedLock: Send + Sync + 'static {
34 /// Try once to acquire `key` with the given `ttl`. Returns
35 /// `Ok(Some(guard))` on success, `Ok(None)` when the key is
36 /// currently held by another holder, and `Err(_)` for backend
37 /// failure.
38 async fn try_acquire(
39 &self,
40 key: &AdvisoryKey,
41 ttl: Duration,
42 ) -> PersistenceResult<Option<LockGuard>>;
43
44 /// Block until the lock is acquired or `deadline` elapses.
45 /// Implementors poll with backoff between attempts.
46 async fn acquire(
47 &self,
48 key: &AdvisoryKey,
49 ttl: Duration,
50 deadline: Duration,
51 ) -> PersistenceResult<LockGuard>;
52
53 /// Extend the holder's TTL. Returns `Ok(false)` when the lock has
54 /// already been released or expired (the guard's token no longer
55 /// matches the stored value).
56 async fn extend(&self, guard: &LockGuard, ttl: Duration) -> PersistenceResult<bool>;
57
58 /// Release the lock. Consumes the guard. The implementation is a
59 /// no-op when the token already mismatches (lock expired by TTL
60 /// before the caller got here).
61 async fn release(&self, guard: LockGuard) -> PersistenceResult<()>;
62}
63
64/// Owned proof that the holder currently has exclusive access to a
65/// key.
66///
67/// Drop semantics: when a `LockGuard` is dropped without an explicit
68/// [`DistributedLock::release`] call, a `tracing::warn!` records the
69/// leak. The lock will still expire by TTL, so correctness is
70/// preserved, but the warning surfaces forgotten release calls in
71/// telemetry.
72#[derive(Debug)]
73pub struct LockGuard {
74 key: AdvisoryKey,
75 token: String,
76 acquired_at: Instant,
77 released: bool,
78}
79
80impl LockGuard {
81 /// Construct a guard. Backend [`DistributedLock`] impls call this
82 /// after a successful acquire — the `token` is the per-acquire
83 /// ownership marker the backend stores alongside the lock value.
84 pub fn new(key: AdvisoryKey) -> Self {
85 Self {
86 key,
87 token: Uuid::new_v4().to_string(),
88 acquired_at: Instant::now(),
89 released: false,
90 }
91 }
92
93 /// Borrow the lock key.
94 pub const fn key(&self) -> &AdvisoryKey {
95 &self.key
96 }
97
98 /// Borrow the per-acquire ownership token.
99 pub fn token(&self) -> &str {
100 &self.token
101 }
102
103 /// Wall-clock duration the guard has been held.
104 pub fn held_for(&self) -> Duration {
105 self.acquired_at.elapsed()
106 }
107
108 /// Mark the guard as released — backend [`DistributedLock`] impls
109 /// call this from `release()` so the [`Drop`] impl does not warn.
110 /// Outside backend code there is no reason to call this directly.
111 pub fn mark_released(&mut self) {
112 self.released = true;
113 }
114}
115
116impl Drop for LockGuard {
117 fn drop(&mut self) {
118 if !self.released {
119 tracing::warn!(
120 target: "entelix_persistence::lock",
121 key = %self.key,
122 held_ms = self.acquired_at.elapsed().as_millis() as u64,
123 "LockGuard dropped without explicit release; lock will expire by TTL only"
124 );
125 }
126 }
127}
128
129/// Default TTL for session locks — 30 seconds. Long enough to bridge
130/// a typical model call, short enough that a crashed holder doesn't
131/// stall the next request for too long.
132pub const DEFAULT_SESSION_LOCK_TTL: Duration = Duration::from_secs(30);
133
134/// Default total deadline a caller will wait for the lock — 5 seconds.
135pub const DEFAULT_SESSION_LOCK_DEADLINE: Duration = Duration::from_secs(5);
136
137/// Acquire a session lock keyed by `(tenant, thread)`, run the
138/// caller's closure, then release the lock. The lock is released even
139/// if `f` returns an error.
140///
141/// `lock` is any backend that implements [`DistributedLock`] —
142/// typically a `postgres::PostgresPersistence` or
143/// `redis::RedisPersistence` handle. Pass `None` for `ttl` /
144/// `deadline` to use the defaults.
145///
146/// The closure does not receive the guard — `with_session_lock`
147/// owns the lifecycle. Callers that need to extend the lock during a
148/// long-running operation use [`DistributedLock::acquire`] /
149/// [`DistributedLock::extend`] / [`DistributedLock::release`]
150/// directly.
151pub async fn with_session_lock<L, F, Fut, T, E>(
152 lock: &L,
153 tenant_id: &entelix_core::TenantId,
154 thread_id: &str,
155 ttl: Option<Duration>,
156 deadline: Option<Duration>,
157 f: F,
158) -> PersistenceResult<T>
159where
160 L: DistributedLock + ?Sized,
161 F: FnOnce() -> Fut,
162 Fut: std::future::Future<Output = std::result::Result<T, E>>,
163 E: Into<PersistenceError>,
164{
165 let key = AdvisoryKey::for_session(tenant_id, thread_id);
166 let ttl = ttl.unwrap_or(DEFAULT_SESSION_LOCK_TTL);
167 let deadline = deadline.unwrap_or(DEFAULT_SESSION_LOCK_DEADLINE);
168
169 let guard = lock.acquire(&key, ttl, deadline).await?;
170 let outcome = f().await;
171 // Best-effort release — TTL is the safety net.
172 if let Err(e) = lock.release(guard).await {
173 tracing::warn!(
174 target: "entelix_persistence::lock",
175 error = %e,
176 "lock release failed; relying on TTL expiry"
177 );
178 }
179 outcome.map_err(Into::into)
180}