arcly-http 0.3.0

Enterprise-grade NestJS-inspired web framework on axum: zero-lock DI, declarative controllers, multi-tenant data routing, transactional outbox, ABAC, and a self-documenting OpenAPI surface
Documentation
//! Distributed lock with fencing tokens — cluster-wide single-holder sections.
//!
//! Solves "exactly one replica should do this" (outbox relays, cron jobs,
//! schema migrations) without per-process assumptions. Three properties a
//! naive Redis `SET NX` misses, all enforced here:
//!
//! 1. **Fencing tokens** — every successful acquire returns a cluster-wide
//!    monotonic `u64`. Downstream writers persist/compare it, so a holder
//!    that lost its lock during a GC pause (TTL expired, someone else
//!    acquired) writes with a *stale* token and is rejected by storage —
//!    the classic Kleppmann lock-safety argument.
//! 2. **Compare-and-delete release** — release only removes the key when the
//!    stored random token matches ours (Lua CAD), so an expired holder can
//!    never delete its successor's lock.
//! 3. **Auto-renew heartbeat** — a background task renews at `ttl/3` while
//!    the guard lives; dropping the guard aborts the heartbeat and releases.
//!
//! Failure policy is conservative: if the backend is unreachable the lock is
//! reported as *not acquired* — for leader-election workloads, "nobody runs"
//! is recoverable; "everybody runs" may not be.

use std::sync::Arc;
use std::time::Duration;

use futures::future::BoxFuture;

// ─── Backend contract ─────────────────────────────────────────────────────────

/// Storage backend (Redis in production). Implementations MUST make
/// `try_acquire` atomic (single Lua script / transaction).
pub trait DLockBackend: Send + Sync + 'static {
    /// Atomically: if `key` is free, store `token` with `ttl_ms` and return
    /// `Some(fencing_token)` from a monotonic cluster counter; else `None`.
    fn try_acquire<'a>(
        &'a self,
        key: &'a str,
        token: &'a str,
        ttl_ms: u64,
    ) -> BoxFuture<'a, Result<Option<u64>, String>>;

    /// Compare-and-delete: remove `key` only if it still holds `token`.
    fn release<'a>(&'a self, key: &'a str, token: &'a str) -> BoxFuture<'a, Result<bool, String>>;

    /// Extend the TTL only if `key` still holds `token`.
    fn renew<'a>(
        &'a self,
        key: &'a str,
        token: &'a str,
        ttl_ms: u64,
    ) -> BoxFuture<'a, Result<bool, String>>;
}

// ─── Guard ────────────────────────────────────────────────────────────────────

/// Live ownership of a distributed lock. Dropping it stops the heartbeat and
/// releases the key (compare-and-delete — never deletes a successor's lock).
pub struct LockGuard {
    /// Cluster-monotonic fencing token — pass to downstream writes.
    pub fencing_token: u64,
    key: String,
    token: String,
    backend: Arc<dyn DLockBackend>,
    renew: Option<tokio::task::JoinHandle<()>>,
}

impl Drop for LockGuard {
    fn drop(&mut self) {
        if let Some(h) = self.renew.take() {
            h.abort();
        }
        let (backend, key, token) = (self.backend.clone(), self.key.clone(), self.token.clone());
        tokio::spawn(async move {
            if let Err(e) = backend.release(&key, &token).await {
                tracing::warn!(key = %key, error = %e, "lock release failed (TTL will reap)");
            }
        });
    }
}

// ─── The lock ─────────────────────────────────────────────────────────────────

/// `const`-constructible named lock. The backend comes from DI
/// (`Arc<dyn DLockBackend>`), so the same static works across environments.
pub struct DistributedLock {
    pub name: &'static str,
    pub ttl_ms: u64,
}

impl DistributedLock {
    pub const fn new(name: &'static str, ttl_ms: u64) -> Self {
        Self { name, ttl_ms }
    }

    /// Non-blocking acquisition. `Ok(None)` = another holder exists (or the
    /// backend is down — conservative). On success the guard auto-renews
    /// every `ttl/3` until dropped.
    pub async fn try_lock(&self, backend: &Arc<dyn DLockBackend>) -> Option<LockGuard> {
        let key = format!("arcly:lock:{}", self.name);
        let token = random_token();

        match backend.try_acquire(&key, &token, self.ttl_ms).await {
            Ok(Some(fencing_token)) => {
                let renew =
                    spawn_heartbeat(backend.clone(), key.clone(), token.clone(), self.ttl_ms);
                Some(LockGuard {
                    fencing_token,
                    key,
                    token,
                    backend: backend.clone(),
                    renew: Some(renew),
                })
            }
            Ok(None) => None,
            Err(e) => {
                metrics::counter!("dlock_backend_errors_total").increment(1);
                tracing::warn!(lock = self.name, error = %e,
                    "lock backend unreachable — treating as not acquired");
                None
            }
        }
    }
}

fn spawn_heartbeat(
    backend: Arc<dyn DLockBackend>,
    key: String,
    token: String,
    ttl_ms: u64,
) -> tokio::task::JoinHandle<()> {
    tokio::spawn(async move {
        let mut tick = tokio::time::interval(Duration::from_millis((ttl_ms / 3).max(1)));
        tick.tick().await; // skip immediate
        loop {
            tick.tick().await;
            match backend.renew(&key, &token, ttl_ms).await {
                Ok(true) => {}
                Ok(false) => {
                    // We lost the lock (TTL lapsed and someone else took it).
                    // Stop renewing; the guard's fencing token is now stale and
                    // downstream compare-writes will reject it.
                    tracing::warn!(key = %key, "lock lost — heartbeat stopping");
                    return;
                }
                Err(e) => tracing::warn!(key = %key, error = %e, "lock renew failed"),
            }
        }
    })
}

fn random_token() -> String {
    use std::collections::hash_map::DefaultHasher;
    use std::hash::{Hash, Hasher};
    use std::sync::atomic::{AtomicU64, Ordering};
    static SEQ: AtomicU64 = AtomicU64::new(0);
    let mut h = DefaultHasher::new();
    std::time::SystemTime::now().hash(&mut h);
    std::thread::current().id().hash(&mut h);
    SEQ.fetch_add(1, Ordering::Relaxed).hash(&mut h);
    format!("{:016x}{:016x}", h.finish(), std::process::id() as u64)
}