rust-job-queue-api-worker-system 0.1.0

A production-shaped Rust job queue: Axum API + async workers + Postgres SKIP LOCKED dequeue, retries with decorrelated jitter, idempotency, cooperative cancellation, OpenAPI, Prometheus metrics.
# Architecture

## Crate shape

One Cargo package, `rust-job-queue-api-worker-system`. Three feature flags carve out three audiences:

| Feature | Always on? | What it adds |
|---|---|---|
| _(core — no feature gate)_ | yes | Domain types, payload validation, `PgPool`/migrations, the queue SQL, retry math |
| `api` | yes by default | Axum router, HTTP error mapping, OpenAPI assembly, Swagger UI, Prometheus exporter |
| `worker` | yes by default | `Executor` trait, `SimulatedExecutor`, `WorkerRuntime`, recovery sweep |

Two binaries, gated by `required-features`:

| Binary | Feature required | Purpose |
|---|---|---|
| `job-queue-api` | `api` | HTTP server |
| `job-queue-worker` | `worker` | Async job processor |
| `job-queue-migrate` | _(none)_ | One-shot migration runner |

CI checks the gating: three of the steps in [.github/workflows/ci.yml](../.github/workflows/ci.yml) run `cargo check --no-default-features --features X` to confirm each feature compiles in isolation.

## Data model

```
jobs
├── id               UUID PK             — Uuid v7, time-ordered
├── kind             job_kind ENUM       — send_email | resize_image | summarize_text | webhook_delivery
├── payload          JSONB               — opaque to the queue; validated by core::payload::validate
├── status           job_status ENUM     — queued | running | succeeded | retrying | failed_permanent | cancelled
├── attempts         INT                 — 0 at insert; +1 at each dequeue
├── max_attempts     INT                 — 3 by default
├── last_error       TEXT                — preserved across retry success
├── run_at           TIMESTAMPTZ         — earliest time the row is eligible for dequeue
├── locked_at        TIMESTAMPTZ         — diagnostic + recovery sweep
├── locked_by        TEXT                — worker id, diagnostic
├── cancel_requested BOOLEAN             — cooperative-cancel flag for running jobs
├── idempotency_key  TEXT                — nullable; partial unique index
├── created_at       TIMESTAMPTZ
└── updated_at       TIMESTAMPTZ
```

Three indices:
- **`jobs_ready_idx` on `(run_at)` WHERE `status IN ('queued','retrying')`** — the partial index the SKIP LOCKED dequeue scans. Narrow and hot.
- **`jobs_idempotency_key_uidx` on `(idempotency_key)` WHERE `idempotency_key IS NOT NULL`** — partial UNIQUE so multiple NULL keys are allowed.
- **`jobs_status_idx` on `(status)`** — supports status-filtered list queries.
- **`jobs_created_at_idx` on `(created_at DESC)`** — supports the default reverse-chronological list.

Plus three CHECK constraints (`attempts >= 0`, `max_attempts >= 1`, `attempts <= max_attempts`) so the DB enforces the invariants even when application code has a bug.

## Status lifecycle

```mermaid
stateDiagram-v2
    [*] --> queued: enqueue
    queued --> running: SKIP LOCKED dequeue
    queued --> cancelled: request_cancel
    running --> succeeded: mark_succeeded
    running --> retrying: mark_failed_or_retry (attempts < max)
    running --> failed_permanent: mark_failed_or_retry (attempts >= max)
    running --> cancelled: cancel_requested observed
    retrying --> running: SKIP LOCKED dequeue (after run_at <= now())
    retrying --> cancelled: request_cancel
    succeeded --> [*]
    failed_permanent --> [*]
    cancelled --> [*]
```

## The dequeue (the centerpiece)

```sql
UPDATE jobs
   SET status     = 'running'::job_status,
       attempts   = attempts + 1,
       locked_at  = now(),
       locked_by  = $1,
       updated_at = now()
 WHERE id = (
       SELECT id FROM jobs
        WHERE status IN ('queued','retrying')
          AND run_at <= now()
          AND cancel_requested = FALSE
        ORDER BY run_at
        FOR UPDATE SKIP LOCKED
        LIMIT 1
 )
RETURNING <columns>;
```

Single statement, atomic. The inner `SELECT ... FOR UPDATE SKIP LOCKED` is the canonical Postgres job-queue pattern (also used by `sqlxmq`, `graphile-worker`, `Que` for Ruby, `Oban` for Elixir). Each worker sees a disjoint subset of rows; locked rows are skipped, not waited on. Empirically validated by [tests/concurrency_two_workers.rs](../tests/concurrency_two_workers.rs): 200 jobs across 8 worker tasks, every job processed exactly once.

Idle workers poll at 500 ms, backing off to a 2 s ceiling. There is no LISTEN/NOTIFY — see [tradeoffs.md](tradeoffs.md).

## Enqueue path

```mermaid
sequenceDiagram
    participant Client
    participant API as /jobs (POST)
    participant DB as Postgres
    Client->>API: POST {kind, payload, ...} + Idempotency-Key header
    API->>API: validate payload (core::payload::validate)
    API->>DB: INSERT ... ON CONFLICT (idempotency_key) DO NOTHING RETURNING *
    alt new row
        DB-->>API: row
        API-->>Client: 201 Created + JobResponse
    else key already present
        DB-->>API: no rows
        API->>DB: SELECT * WHERE idempotency_key = $1
        DB-->>API: existing row
        API-->>Client: 200 OK + JobResponse
    end
```

## Retry policy

Decorrelated-jitter approximation. The next wait is uniformly drawn from `[BASE_MS, min(CAP_MS, BASE_MS · 3^attempt)]` with `BASE_MS = 1_000` and `CAP_MS = 60_000`. This matches the spread shape of the AWS Architecture Blog's decorrelated-jitter sequence without needing to persist the previous wait — bucketing by attempt count is enough.

On a failure, `mark_failed_or_retry`:
- if `attempts >= max_attempts` → status becomes `failed_permanent`, `run_at` unchanged.
- otherwise → status becomes `retrying`, `run_at = now() + backoff(attempts)`.

Workers naturally pick up retries because the dequeue clause includes `run_at <= now()` — no separate scheduler.

## Cancellation

Three-way semantic:
- `queued | retrying`: atomic UPDATE to `cancelled`. Returns `CancelOutcome::CancelledNow` → HTTP 200.
- `running`: set `cancel_requested = TRUE`. The worker re-reads this flag between sub-steps in [src/worker/executor.rs](../src/worker/executor.rs) and transitions to `cancelled` on observation. Returns `CancelOutcome::PendingOnWorker` → HTTP 202.
- terminal (`succeeded | failed_permanent | cancelled`): no-op. Returns `CancelOutcome::AlreadyTerminal(status)` → HTTP 409.

Tokio tasks cannot be safely preempted; cooperative cancellation is the only correct production pattern.

## Graceful shutdown + recovery

On SIGINT/SIGTERM the worker bin flips a `CancellationToken`. The worker loops exit between jobs. An in-flight job that takes longer than the 30 s grace window is hard-aborted; its row stays in `running` with `locked_at` set. On the next startup, [src/worker/recovery.rs](../src/worker/recovery.rs) calls `queue::recover_stale` which resets any `running` row with `locked_at < now() - 5min` back to `retrying`.

This is the production-correct pattern: at no point do we mark a job failed because the worker process exited.

## Observability

- `tracing_subscriber` with JSON formatter when `RUST_LOG_FORMAT=json`, pretty otherwise.
- HTTP layer middleware (in order): `SetRequestIdLayer` → `TraceLayer` (with custom span carrying `request_id`) → `PropagateRequestIdLayer` → 30 s `TimeoutLayer` returning 504.
- Worker spans: `tracing::info_span!("job", job_id, kind, attempt, worker_id)` per claimed job.
- Prometheus metrics via `metrics-exporter-prometheus`. The API binary serves `/metrics` on the same port as its HTTP API (default 8080). The worker binary installs an independent HTTP listener on `WORKER_METRICS_BIND_ADDR` (default `0.0.0.0:9091`) so a single Prometheus scrape config can target each process. Worker emits `worker_jobs_started_total{kind}`, `worker_jobs_completed_total{kind,outcome}`, and `worker_job_duration_seconds{kind,outcome}` (histogram).

## Concurrency test

[tests/concurrency_two_workers.rs](../tests/concurrency_two_workers.rs) creates a separate `processed_log` table, enqueues 200 jobs, spins up 8 independent `WorkerRuntime` instances against the same Postgres, and asserts:

1. All 200 jobs reach `succeeded`.
2. `processed_log` has exactly 200 rows.
3. `processed_log` has exactly 200 **distinct** `job_id` values.
4. Every job has `attempts = 1` (each was claimed exactly once).
5. More than one worker actually participated (sanity: contention was exercised).

This is bounded empirical evidence for the tested schedule and harness. It does not claim global exactly-once semantics for crashes or arbitrary external side effects; duplicate processing in this test would show up as duplicate `job_id` rows in `processed_log`.

## Design decisions

Each subsection here is written in the shape of a PR description: the call that was made, the alternatives considered, and the reason the chosen alternative won.

### Single crate with feature flags (not a workspace of three crates)

**Decision**: one Cargo package, two optional features (`api`, `worker`), three binaries with `required-features` gating.

**Alternatives**: a workspace of three crates (`core` library + `api` binary + `worker` binary). This was the initial direction during the first build phases and was retrofitted.

**Why this won**: a single crate publishes more cleanly to crates.io (one `cargo publish`, one README on the crate page, one set of metadata) while feature flags still keep HTTP and worker-specific dependencies off consumers who only want the queue types. This is the pattern `axum`, `tower-http`, and `tokio` use to publish modular libraries. The cost is that workspace-style separation between layers is sacrificed; the benefit is a smaller surface for consumers and a cleaner publishing story. CI checks the decoupling with three `cargo check --no-default-features --features X` probes.

### Polling-only dequeue (no LISTEN/NOTIFY)

**Decision**: workers poll the queue at 500 ms, backing off to a 2 s ceiling when idle. There is no `pg_notify` channel.

**Alternatives**: a hybrid that uses LISTEN/NOTIFY for low-latency wake-up plus polling as a fallback for missed notifications.

**Why this won**: the additional code path roughly doubles the worker's wakeup-and-dispatch surface for, at most, ~250 ms of latency saving on an idle queue. Under any meaningful load the workers are always behind the queue and polling makes no observable difference. For an intentionally-small artifact, the simpler shape wins. Adding NOTIFY later is a non-breaking change.

### Cooperative cancellation (not preemptive)

**Decision**: `POST /jobs/{id}/cancel` on a running job sets a `cancel_requested = TRUE` flag; the worker observes that flag between sub-steps in [`SimulatedExecutor::execute`] and transitions the row to `cancelled` only when it sees the flag.

**Alternatives**: pre-empt the task with `JoinHandle::abort()` immediately.

**Why this won**: Tokio tasks cannot be safely preempted mid-step. Any cleanup, transactional write, or external side-effect that the executor was about to perform would be lost if the task is killed asynchronously. Cooperative cancellation is the only correct production pattern for an async runtime that doesn't have task-level transactions. The trade-off is observable latency: a job is cancelled at the next sub-step boundary, not instantly.

### 5-minute stale-lock threshold

**Decision**: the recovery sweep only re-queues rows whose `locked_at < now() - interval '5 minutes'`.

**Alternatives**: a shorter threshold (30 s, 1 minute) to recover faster.

**Why this won**: a shorter threshold risks recovering jobs that are actually still being processed by a live worker. The worst case under 5 minutes is a 5-minute stall on a row whose worker truly crashed; the worst case under a 30-second threshold is *duplicate processing* of a row whose worker is currently mid-execution but holding a long external call. Duplicate side-effects are far worse than a 5-minute stall. The threshold is configurable via the `recover_stale_at_startup` argument; production deployments with shorter SLAs can tune it.

### `ON CONFLICT DO NOTHING` for idempotency (not `DO UPDATE`)

**Decision**: when a duplicate `idempotency_key` is observed during `enqueue`, the insert is rejected (`DO NOTHING`); the application then re-fetches the existing row and returns it.

**Alternatives**: `ON CONFLICT (idempotency_key) DO UPDATE SET ...` to merge the new payload into the existing row.

**Why this won**: idempotency means "the same key always produces the same job"; allowing the second call to overwrite the first row violates that contract. The Stripe API convention (same key → same response, regardless of how the body changed) is the standard, and matches user intuition: if I retry a POST with the same Idempotency-Key, I expect to get the original job, not a job that mysteriously changed shape. The cost is one extra round-trip on conflict; the benefit is correctness.

### `Uuid::now_v7` for ids (not v4 or sequence)

**Decision**: every `JobId` is a `Uuid::now_v7` (time-ordered, 128-bit).

**Alternatives**: `Uuid::new_v4` (random) or a `BIGSERIAL` integer column.

**Why this won**:
- vs. v4: v7 is time-ordered, which gives index locality. Insertion patterns tend to hit adjacent pages in the primary-key BTree instead of scattering across the whole index. The expected benefit is lower index churn for high-insert workloads; this repo does not include a v4-vs-v7 write-amplification benchmark.
- vs. `BIGSERIAL`: opaque external ids resist [insecure direct object reference] attacks — a client cannot guess valid job ids by incrementing. They also work across sharded deployments without coordination.

[insecure direct object reference]: https://owasp.org/www-community/attacks/Insecure_Direct_Object_Reference_Prevention_Cheat_Sheet

[`SimulatedExecutor::execute`]: ../src/worker/executor.rs