arcly_http/resilience/dlock.rs
1//! Distributed lock with fencing tokens — cluster-wide single-holder sections.
2//!
3//! Solves "exactly one replica should do this" (outbox relays, cron jobs,
4//! schema migrations) without per-process assumptions. Three properties a
5//! naive Redis `SET NX` misses, all enforced here:
6//!
7//! 1. **Fencing tokens** — every successful acquire returns a cluster-wide
8//! monotonic `u64`. Downstream writers persist/compare it, so a holder
9//! that lost its lock during a GC pause (TTL expired, someone else
10//! acquired) writes with a *stale* token and is rejected by storage —
11//! the classic Kleppmann lock-safety argument.
12//! 2. **Compare-and-delete release** — release only removes the key when the
13//! stored random token matches ours (Lua CAD), so an expired holder can
14//! never delete its successor's lock.
15//! 3. **Auto-renew heartbeat** — a background task renews at `ttl/3` while
16//! the guard lives; dropping the guard aborts the heartbeat and releases.
17//!
18//! Failure policy is conservative: if the backend is unreachable the lock is
19//! reported as *not acquired* — for leader-election workloads, "nobody runs"
20//! is recoverable; "everybody runs" may not be.
21
22use std::sync::Arc;
23use std::time::Duration;
24
25use futures::future::BoxFuture;
26
27// ─── Backend contract ─────────────────────────────────────────────────────────
28
29/// Storage backend (Redis in production). Implementations MUST make
30/// `try_acquire` atomic (single Lua script / transaction).
31pub trait DLockBackend: Send + Sync + 'static {
32 /// Atomically: if `key` is free, store `token` with `ttl_ms` and return
33 /// `Some(fencing_token)` from a monotonic cluster counter; else `None`.
34 fn try_acquire<'a>(
35 &'a self,
36 key: &'a str,
37 token: &'a str,
38 ttl_ms: u64,
39 ) -> BoxFuture<'a, Result<Option<u64>, String>>;
40
41 /// Compare-and-delete: remove `key` only if it still holds `token`.
42 fn release<'a>(&'a self, key: &'a str, token: &'a str) -> BoxFuture<'a, Result<bool, String>>;
43
44 /// Extend the TTL only if `key` still holds `token`.
45 fn renew<'a>(
46 &'a self,
47 key: &'a str,
48 token: &'a str,
49 ttl_ms: u64,
50 ) -> BoxFuture<'a, Result<bool, String>>;
51}
52
53// ─── Guard ────────────────────────────────────────────────────────────────────
54
55/// Live ownership of a distributed lock. Dropping it stops the heartbeat and
56/// releases the key (compare-and-delete — never deletes a successor's lock).
57pub struct LockGuard {
58 /// Cluster-monotonic fencing token — pass to downstream writes.
59 pub fencing_token: u64,
60 key: String,
61 token: String,
62 backend: Arc<dyn DLockBackend>,
63 renew: Option<tokio::task::JoinHandle<()>>,
64}
65
66impl Drop for LockGuard {
67 fn drop(&mut self) {
68 if let Some(h) = self.renew.take() {
69 h.abort();
70 }
71 let (backend, key, token) = (self.backend.clone(), self.key.clone(), self.token.clone());
72 tokio::spawn(async move {
73 if let Err(e) = backend.release(&key, &token).await {
74 tracing::warn!(key = %key, error = %e, "lock release failed (TTL will reap)");
75 }
76 });
77 }
78}
79
80// ─── The lock ─────────────────────────────────────────────────────────────────
81
82/// `const`-constructible named lock. The backend comes from DI
83/// (`Arc<dyn DLockBackend>`), so the same static works across environments.
84pub struct DistributedLock {
85 pub name: &'static str,
86 pub ttl_ms: u64,
87}
88
89impl DistributedLock {
90 pub const fn new(name: &'static str, ttl_ms: u64) -> Self {
91 Self { name, ttl_ms }
92 }
93
94 /// Non-blocking acquisition. `Ok(None)` = another holder exists (or the
95 /// backend is down — conservative). On success the guard auto-renews
96 /// every `ttl/3` until dropped.
97 pub async fn try_lock(&self, backend: &Arc<dyn DLockBackend>) -> Option<LockGuard> {
98 let key = format!("arcly:lock:{}", self.name);
99 let token = random_token();
100
101 match backend.try_acquire(&key, &token, self.ttl_ms).await {
102 Ok(Some(fencing_token)) => {
103 let renew =
104 spawn_heartbeat(backend.clone(), key.clone(), token.clone(), self.ttl_ms);
105 Some(LockGuard {
106 fencing_token,
107 key,
108 token,
109 backend: backend.clone(),
110 renew: Some(renew),
111 })
112 }
113 Ok(None) => None,
114 Err(e) => {
115 metrics::counter!("dlock_backend_errors_total").increment(1);
116 tracing::warn!(lock = self.name, error = %e,
117 "lock backend unreachable — treating as not acquired");
118 None
119 }
120 }
121 }
122}
123
124fn spawn_heartbeat(
125 backend: Arc<dyn DLockBackend>,
126 key: String,
127 token: String,
128 ttl_ms: u64,
129) -> tokio::task::JoinHandle<()> {
130 tokio::spawn(async move {
131 let mut tick = tokio::time::interval(Duration::from_millis((ttl_ms / 3).max(1)));
132 tick.tick().await; // skip immediate
133 loop {
134 tick.tick().await;
135 match backend.renew(&key, &token, ttl_ms).await {
136 Ok(true) => {}
137 Ok(false) => {
138 // We lost the lock (TTL lapsed and someone else took it).
139 // Stop renewing; the guard's fencing token is now stale and
140 // downstream compare-writes will reject it.
141 tracing::warn!(key = %key, "lock lost — heartbeat stopping");
142 return;
143 }
144 Err(e) => tracing::warn!(key = %key, error = %e, "lock renew failed"),
145 }
146 }
147 })
148}
149
150fn random_token() -> String {
151 use std::collections::hash_map::DefaultHasher;
152 use std::hash::{Hash, Hasher};
153 use std::sync::atomic::{AtomicU64, Ordering};
154 static SEQ: AtomicU64 = AtomicU64::new(0);
155 let mut h = DefaultHasher::new();
156 std::time::SystemTime::now().hash(&mut h);
157 std::thread::current().id().hash(&mut h);
158 SEQ.fetch_add(1, Ordering::Relaxed).hash(&mut h);
159 format!("{:016x}{:016x}", h.finish(), std::process::id() as u64)
160}