Skip to main content

arcly_http/resilience/
dlock.rs

1//! Distributed lock with fencing tokens — cluster-wide single-holder sections.
2//!
3//! Solves "exactly one replica should do this" (outbox relays, cron jobs,
4//! schema migrations) without per-process assumptions. Three properties a
5//! naive Redis `SET NX` misses, all enforced here:
6//!
7//! 1. **Fencing tokens** — every successful acquire returns a cluster-wide
8//!    monotonic `u64`. Downstream writers persist/compare it, so a holder
9//!    that lost its lock during a GC pause (TTL expired, someone else
10//!    acquired) writes with a *stale* token and is rejected by storage —
11//!    the classic Kleppmann lock-safety argument.
12//! 2. **Compare-and-delete release** — release only removes the key when the
13//!    stored random token matches ours (Lua CAD), so an expired holder can
14//!    never delete its successor's lock.
15//! 3. **Auto-renew heartbeat** — a background task renews at `ttl/3` while
16//!    the guard lives; dropping the guard aborts the heartbeat and releases.
17//!
18//! Failure policy is conservative: if the backend is unreachable the lock is
19//! reported as *not acquired* — for leader-election workloads, "nobody runs"
20//! is recoverable; "everybody runs" may not be.
21
22use std::sync::Arc;
23use std::time::Duration;
24
25use futures::future::BoxFuture;
26
27// ─── Backend contract ─────────────────────────────────────────────────────────
28
29/// Storage backend (Redis in production). Implementations MUST make
30/// `try_acquire` atomic (single Lua script / transaction).
31pub trait DLockBackend: Send + Sync + 'static {
32    /// Atomically: if `key` is free, store `token` with `ttl_ms` and return
33    /// `Some(fencing_token)` from a monotonic cluster counter; else `None`.
34    fn try_acquire<'a>(
35        &'a self,
36        key: &'a str,
37        token: &'a str,
38        ttl_ms: u64,
39    ) -> BoxFuture<'a, Result<Option<u64>, String>>;
40
41    /// Compare-and-delete: remove `key` only if it still holds `token`.
42    fn release<'a>(&'a self, key: &'a str, token: &'a str) -> BoxFuture<'a, Result<bool, String>>;
43
44    /// Extend the TTL only if `key` still holds `token`.
45    fn renew<'a>(
46        &'a self,
47        key: &'a str,
48        token: &'a str,
49        ttl_ms: u64,
50    ) -> BoxFuture<'a, Result<bool, String>>;
51}
52
53// ─── Guard ────────────────────────────────────────────────────────────────────
54
55/// Live ownership of a distributed lock. Dropping it stops the heartbeat and
56/// releases the key (compare-and-delete — never deletes a successor's lock).
57pub struct LockGuard {
58    /// Cluster-monotonic fencing token — pass to downstream writes.
59    pub fencing_token: u64,
60    key: String,
61    token: String,
62    backend: Arc<dyn DLockBackend>,
63    renew: Option<tokio::task::JoinHandle<()>>,
64}
65
66impl Drop for LockGuard {
67    fn drop(&mut self) {
68        if let Some(h) = self.renew.take() {
69            h.abort();
70        }
71        let (backend, key, token) = (self.backend.clone(), self.key.clone(), self.token.clone());
72        tokio::spawn(async move {
73            if let Err(e) = backend.release(&key, &token).await {
74                tracing::warn!(key = %key, error = %e, "lock release failed (TTL will reap)");
75            }
76        });
77    }
78}
79
80// ─── The lock ─────────────────────────────────────────────────────────────────
81
82/// `const`-constructible named lock. The backend comes from DI
83/// (`Arc<dyn DLockBackend>`), so the same static works across environments.
84pub struct DistributedLock {
85    pub name: &'static str,
86    pub ttl_ms: u64,
87}
88
89impl DistributedLock {
90    pub const fn new(name: &'static str, ttl_ms: u64) -> Self {
91        Self { name, ttl_ms }
92    }
93
94    /// Non-blocking acquisition. `Ok(None)` = another holder exists (or the
95    /// backend is down — conservative). On success the guard auto-renews
96    /// every `ttl/3` until dropped.
97    pub async fn try_lock(&self, backend: &Arc<dyn DLockBackend>) -> Option<LockGuard> {
98        let key = format!("arcly:lock:{}", self.name);
99        let token = random_token();
100
101        match backend.try_acquire(&key, &token, self.ttl_ms).await {
102            Ok(Some(fencing_token)) => {
103                let renew =
104                    spawn_heartbeat(backend.clone(), key.clone(), token.clone(), self.ttl_ms);
105                Some(LockGuard {
106                    fencing_token,
107                    key,
108                    token,
109                    backend: backend.clone(),
110                    renew: Some(renew),
111                })
112            }
113            Ok(None) => None,
114            Err(e) => {
115                metrics::counter!("dlock_backend_errors_total").increment(1);
116                tracing::warn!(lock = self.name, error = %e,
117                    "lock backend unreachable — treating as not acquired");
118                None
119            }
120        }
121    }
122}
123
124fn spawn_heartbeat(
125    backend: Arc<dyn DLockBackend>,
126    key: String,
127    token: String,
128    ttl_ms: u64,
129) -> tokio::task::JoinHandle<()> {
130    tokio::spawn(async move {
131        let mut tick = tokio::time::interval(Duration::from_millis((ttl_ms / 3).max(1)));
132        tick.tick().await; // skip immediate
133        loop {
134            tick.tick().await;
135            match backend.renew(&key, &token, ttl_ms).await {
136                Ok(true) => {}
137                Ok(false) => {
138                    // We lost the lock (TTL lapsed and someone else took it).
139                    // Stop renewing; the guard's fencing token is now stale and
140                    // downstream compare-writes will reject it.
141                    tracing::warn!(key = %key, "lock lost — heartbeat stopping");
142                    return;
143                }
144                Err(e) => tracing::warn!(key = %key, error = %e, "lock renew failed"),
145            }
146        }
147    })
148}
149
150fn random_token() -> String {
151    use std::collections::hash_map::DefaultHasher;
152    use std::hash::{Hash, Hasher};
153    use std::sync::atomic::{AtomicU64, Ordering};
154    static SEQ: AtomicU64 = AtomicU64::new(0);
155    let mut h = DefaultHasher::new();
156    std::time::SystemTime::now().hash(&mut h);
157    std::thread::current().id().hash(&mut h);
158    SEQ.fetch_add(1, Ordering::Relaxed).hash(&mut h);
159    format!("{:016x}{:016x}", h.finish(), std::process::id() as u64)
160}