Skip to main content

entelix_persistence/
lock.rs

1//! Distributed lock primitives.
2//!
3//! [`DistributedLock`] is the backend-agnostic trait; concrete impls
4//! live in the `postgres` (advisory locks) and `redis` (`SET NX PX` +
5//! Lua release script) modules — each gated behind its corresponding
6//! feature flag. [`with_session_lock`] composes the trait with
7//! [`crate::AdvisoryKey::for_session`] so the canonical
8//! `(tenant_id, thread_id)` lock-key derivation is the only thing
9//! callers need to know about.
10
11use std::time::{Duration, Instant};
12
13use async_trait::async_trait;
14use uuid::Uuid;
15
16use crate::advisory_key::AdvisoryKey;
17use crate::error::{PersistenceError, PersistenceResult};
18
19/// Acquire-then-release primitive over a distributed key.
20///
21/// Implementors are responsible for:
22/// - exclusive acquisition by `key`
23/// - per-acquire token issuance so [`Self::release`] is idempotent
24///   even if the same key is held by a later attempt
25/// - TTL enforcement so a crashed holder doesn't deadlock
26///   indefinitely
27///
28/// Cancellation: implementors honour the ambient cancellation token
29/// propagated via [`entelix_core::ExecutionContext`] when one is in
30/// scope. The trait itself does not take a context —
31/// [`with_session_lock`] is the composition point.
32#[async_trait]
33pub trait DistributedLock: Send + Sync + 'static {
34    /// Try once to acquire `key` with the given `ttl`. Returns
35    /// `Ok(Some(guard))` on success, `Ok(None)` when the key is
36    /// currently held by another holder, and `Err(_)` for backend
37    /// failure.
38    async fn try_acquire(
39        &self,
40        key: &AdvisoryKey,
41        ttl: Duration,
42    ) -> PersistenceResult<Option<LockGuard>>;
43
44    /// Block until the lock is acquired or `deadline` elapses.
45    /// Implementors poll with backoff between attempts.
46    async fn acquire(
47        &self,
48        key: &AdvisoryKey,
49        ttl: Duration,
50        deadline: Duration,
51    ) -> PersistenceResult<LockGuard>;
52
53    /// Extend the holder's TTL. Returns `Ok(false)` when the lock has
54    /// already been released or expired (the guard's token no longer
55    /// matches the stored value).
56    async fn extend(&self, guard: &LockGuard, ttl: Duration) -> PersistenceResult<bool>;
57
58    /// Release the lock. Consumes the guard. The implementation is a
59    /// no-op when the token already mismatches (lock expired by TTL
60    /// before the caller got here).
61    async fn release(&self, guard: LockGuard) -> PersistenceResult<()>;
62}
63
64/// Owned proof that the holder currently has exclusive access to a
65/// key.
66///
67/// Drop semantics: when a `LockGuard` is dropped without an explicit
68/// [`DistributedLock::release`] call, a `tracing::warn!` records the
69/// leak. The lock will still expire by TTL, so correctness is
70/// preserved, but the warning surfaces forgotten release calls in
71/// telemetry.
72#[derive(Debug)]
73pub struct LockGuard {
74    key: AdvisoryKey,
75    token: String,
76    acquired_at: Instant,
77    released: bool,
78}
79
80impl LockGuard {
81    /// Construct a guard. Backend [`DistributedLock`] impls call this
82    /// after a successful acquire — the `token` is the per-acquire
83    /// ownership marker the backend stores alongside the lock value.
84    pub fn new(key: AdvisoryKey) -> Self {
85        Self {
86            key,
87            token: Uuid::new_v4().to_string(),
88            acquired_at: Instant::now(),
89            released: false,
90        }
91    }
92
93    /// Borrow the lock key.
94    pub const fn key(&self) -> &AdvisoryKey {
95        &self.key
96    }
97
98    /// Borrow the per-acquire ownership token.
99    pub fn token(&self) -> &str {
100        &self.token
101    }
102
103    /// Wall-clock duration the guard has been held.
104    pub fn held_for(&self) -> Duration {
105        self.acquired_at.elapsed()
106    }
107
108    /// Mark the guard as released — backend [`DistributedLock`] impls
109    /// call this from `release()` so the [`Drop`] impl does not warn.
110    /// Outside backend code there is no reason to call this directly.
111    pub fn mark_released(&mut self) {
112        self.released = true;
113    }
114}
115
116impl Drop for LockGuard {
117    fn drop(&mut self) {
118        if !self.released {
119            tracing::warn!(
120                target: "entelix_persistence::lock",
121                key = %self.key,
122                held_ms = self.acquired_at.elapsed().as_millis() as u64,
123                "LockGuard dropped without explicit release; lock will expire by TTL only"
124            );
125        }
126    }
127}
128
129/// Default TTL for session locks — 30 seconds. Long enough to bridge
130/// a typical model call, short enough that a crashed holder doesn't
131/// stall the next request for too long.
132pub const DEFAULT_SESSION_LOCK_TTL: Duration = Duration::from_secs(30);
133
134/// Default total deadline a caller will wait for the lock — 5 seconds.
135pub const DEFAULT_SESSION_LOCK_DEADLINE: Duration = Duration::from_secs(5);
136
137/// Acquire a session lock keyed by `(tenant, thread)`, run the
138/// caller's closure, then release the lock. The lock is released even
139/// if `f` returns an error.
140///
141/// `lock` is any backend that implements [`DistributedLock`] —
142/// typically a `postgres::PostgresPersistence` or
143/// `redis::RedisPersistence` handle. Pass `None` for `ttl` /
144/// `deadline` to use the defaults.
145///
146/// The closure does not receive the guard — `with_session_lock`
147/// owns the lifecycle. Callers that need to extend the lock during a
148/// long-running operation use [`DistributedLock::acquire`] /
149/// [`DistributedLock::extend`] / [`DistributedLock::release`]
150/// directly.
151pub async fn with_session_lock<L, F, Fut, T, E>(
152    lock: &L,
153    tenant_id: &entelix_core::TenantId,
154    thread_id: &str,
155    ttl: Option<Duration>,
156    deadline: Option<Duration>,
157    f: F,
158) -> PersistenceResult<T>
159where
160    L: DistributedLock + ?Sized,
161    F: FnOnce() -> Fut,
162    Fut: std::future::Future<Output = std::result::Result<T, E>>,
163    E: Into<PersistenceError>,
164{
165    let key = AdvisoryKey::for_session(tenant_id, thread_id);
166    let ttl = ttl.unwrap_or(DEFAULT_SESSION_LOCK_TTL);
167    let deadline = deadline.unwrap_or(DEFAULT_SESSION_LOCK_DEADLINE);
168
169    let guard = lock.acquire(&key, ttl, deadline).await?;
170    let outcome = f().await;
171    // Best-effort release — TTL is the safety net.
172    if let Err(e) = lock.release(guard).await {
173        tracing::warn!(
174            target: "entelix_persistence::lock",
175            error = %e,
176            "lock release failed; relying on TTL expiry"
177        );
178    }
179    outcome.map_err(Into::into)
180}