rust-job-queue-api-worker-system 0.1.0

A production-shaped Rust job queue: Axum API + async workers + Postgres SKIP LOCKED dequeue, retries with decorrelated jitter, idempotency, cooperative cancellation, OpenAPI, Prometheus metrics.
//! Retry-backoff math.
//!
//! Implements a stateless approximation of the AWS Architecture Blog's
//! "decorrelated jitter" pattern. The canonical sequence is:
//!
//! ```text
//! sleep_n = min(cap, random_between(base, sleep_{n-1} * 3))
//! ```
//!
//! which requires persisting the previous sleep. We don't persist that:
//! the queue table has no `last_backoff_ms` column, and adding one would
//! couple the backoff policy to the schema. Instead we bucket by attempt
//! count:
//!
//! ```text
//! sleep_n = random_between(base, min(cap, base * 3^n))
//! ```
//!
//! The distribution shape is the same (uniform over a window that grows
//! geometrically until it hits the cap), which is what decorrelated
//! jitter is for: spreading retries across workers so they don't
//! synchronise into a thundering-herd retry storm.

use std::time::Duration;

use rand::Rng;

/// Minimum backoff. No attempt waits less than 1 second.
///
/// One second is the "you have time to notice and observe" floor.
/// Smaller values would let a tight-fail-loop job hammer the queue;
/// larger values would over-delay the first retry of a genuinely
/// transient failure.
pub const BASE_MS: u64 = 1_000;

/// Maximum backoff. No attempt waits more than 60 seconds.
///
/// 60 s is the "operator notices within a minute" ceiling. With
/// `max_attempts = 3` and the geometric growth, a job that exhausts its
/// attempts spans at most ~3 minutes of wall clock before lands in
/// `failed_permanent`.
pub const CAP_MS: u64 = 60_000;

/// Compute the next wait before retrying a job that just failed.
///
/// `attempt` is the just-completed attempt number (1-indexed). The
/// returned `Duration` is the time to wait before the next attempt is
/// eligible for dequeue (i.e., what we set as `run_at - now()` on the
/// row).
///
/// The return is drawn uniformly from `[BASE_MS,
/// min(CAP_MS, BASE_MS ยท 3^attempt)]`. For `attempt = 1` the window is
/// `[1s, 3s]`; for `attempt = 2` it's `[1s, 9s]`; for `attempt = 4` it's
/// already saturated at `[1s, 60s]`.
///
/// `attempt = 0` is treated as `attempt = 1` for safety; the queue's
/// guard guarantees we never call this with zero attempts in practice.
pub fn backoff_for_attempt(attempt: i32) -> Duration {
    let attempt_u = attempt.max(1) as u32;
    // Cap the exponent before exponentiation; `3u64.pow(20)` overflows
    // u64. `3^10 = 59049 * BASE_MS = 59 s`, already past CAP_MS at the
    // default `BASE_MS = 1s`, so capping at 10 loses no expressiveness.
    let exp = attempt_u.min(10);
    let upper_ms = BASE_MS.saturating_mul(3u64.saturating_pow(exp)).min(CAP_MS);
    let ms = if upper_ms <= BASE_MS {
        // Degenerate range: would panic in `random_range`. Just return
        // the floor.
        BASE_MS
    } else {
        rand::rng().random_range(BASE_MS..=upper_ms)
    };
    Duration::from_millis(ms)
}

#[cfg(test)]
mod tests {
    use super::*;

    // 200 samples is enough to catch off-by-one boundary bugs in the
    // range expression without making the test suite slow.

    #[test]
    fn first_attempt_within_bounds() {
        // Window for attempt 1 is [BASE_MS, BASE_MS * 3].
        for _ in 0..200 {
            let d = backoff_for_attempt(1).as_millis() as u64;
            assert!((BASE_MS..=BASE_MS * 3).contains(&d), "got {d}");
        }
    }

    #[test]
    fn second_attempt_widens() {
        // Window for attempt 2 is [BASE_MS, BASE_MS * 9].
        for _ in 0..200 {
            let d = backoff_for_attempt(2).as_millis() as u64;
            assert!((BASE_MS..=BASE_MS * 9).contains(&d), "got {d}");
        }
    }

    #[test]
    fn caps_at_60s() {
        // Saturating against CAP_MS means even at attempt 20 we never
        // wait more than the cap.
        for _ in 0..200 {
            let d = backoff_for_attempt(20).as_millis() as u64;
            assert!(d <= CAP_MS, "got {d}");
        }
    }

    #[test]
    fn attempt_zero_treated_as_one() {
        // Defensive: callers should never pass 0, but if they do, the
        // function must not panic.
        let d = backoff_for_attempt(0).as_millis() as u64;
        assert!((BASE_MS..=BASE_MS * 3).contains(&d), "got {d}");
    }

    #[test]
    fn never_below_base() {
        // Floor is BASE_MS regardless of attempt count.
        for _ in 0..200 {
            let d = backoff_for_attempt(1).as_millis() as u64;
            assert!(d >= BASE_MS, "got {d}");
        }
    }
}