# Architecture
## Crate shape
One Cargo package, `rust-job-queue-api-worker-system`. Three feature flags carve out three audiences:
| Feature | Always on? | What it adds |
|---|---|---|
| _(core — no feature gate)_ | yes | Domain types, payload validation, `PgPool`/migrations, the queue SQL, retry math |
| `api` | yes by default | Axum router, HTTP error mapping, OpenAPI assembly, Swagger UI, Prometheus exporter |
| `worker` | yes by default | `Executor` trait, `SimulatedExecutor`, `WorkerRuntime`, recovery sweep |
Two binaries, gated by `required-features`:
| Binary | Feature required | Purpose |
|---|---|---|
| `job-queue-api` | `api` | HTTP server |
| `job-queue-worker` | `worker` | Async job processor |
| `job-queue-migrate` | _(none)_ | One-shot migration runner |
CI checks the gating: three of the steps in [.github/workflows/ci.yml](../.github/workflows/ci.yml) run `cargo check --no-default-features --features X` to confirm each feature compiles in isolation.
## Data model
```
jobs
├── id UUID PK — Uuid v7, time-ordered
├── kind job_kind ENUM — send_email | resize_image | summarize_text | webhook_delivery
├── payload JSONB — opaque to the queue; validated by core::payload::validate
├── status job_status ENUM — queued | running | succeeded | retrying | failed_permanent | cancelled
├── attempts INT — 0 at insert; +1 at each dequeue
├── max_attempts INT — 3 by default
├── last_error TEXT — preserved across retry success
├── run_at TIMESTAMPTZ — earliest time the row is eligible for dequeue
├── locked_at TIMESTAMPTZ — diagnostic + recovery sweep
├── locked_by TEXT — worker id, diagnostic
├── cancel_requested BOOLEAN — cooperative-cancel flag for running jobs
├── idempotency_key TEXT — nullable; partial unique index
├── created_at TIMESTAMPTZ
└── updated_at TIMESTAMPTZ
```
Three indices:
- **`jobs_ready_idx` on `(run_at)` WHERE `status IN ('queued','retrying')`** — the partial index the SKIP LOCKED dequeue scans. Narrow and hot.
- **`jobs_idempotency_key_uidx` on `(idempotency_key)` WHERE `idempotency_key IS NOT NULL`** — partial UNIQUE so multiple NULL keys are allowed.
- **`jobs_status_idx` on `(status)`** — supports status-filtered list queries.
- **`jobs_created_at_idx` on `(created_at DESC)`** — supports the default reverse-chronological list.
Plus three CHECK constraints (`attempts >= 0`, `max_attempts >= 1`, `attempts <= max_attempts`) so the DB enforces the invariants even when application code has a bug.
## Status lifecycle
```mermaid
stateDiagram-v2
[*] --> queued: enqueue
queued --> running: SKIP LOCKED dequeue
queued --> cancelled: request_cancel
running --> succeeded: mark_succeeded
running --> retrying: mark_failed_or_retry (attempts < max)
running --> failed_permanent: mark_failed_or_retry (attempts >= max)
running --> cancelled: cancel_requested observed
retrying --> running: SKIP LOCKED dequeue (after run_at <= now())
retrying --> cancelled: request_cancel
succeeded --> [*]
failed_permanent --> [*]
cancelled --> [*]
```
## The dequeue (the centerpiece)
```sql
UPDATE jobs
SET status = 'running'::job_status,
attempts = attempts + 1,
locked_at = now(),
locked_by = $1,
updated_at = now()
WHERE id = (
SELECT id FROM jobs
WHERE status IN ('queued','retrying')
AND run_at <= now()
AND cancel_requested = FALSE
ORDER BY run_at
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING <columns>;
```
Single statement, atomic. The inner `SELECT ... FOR UPDATE SKIP LOCKED` is the canonical Postgres job-queue pattern (also used by `sqlxmq`, `graphile-worker`, `Que` for Ruby, `Oban` for Elixir). Each worker sees a disjoint subset of rows; locked rows are skipped, not waited on. Empirically validated by [tests/concurrency_two_workers.rs](../tests/concurrency_two_workers.rs): 200 jobs across 8 worker tasks, every job processed exactly once.
Idle workers poll at 500 ms, backing off to a 2 s ceiling. There is no LISTEN/NOTIFY — see [tradeoffs.md](tradeoffs.md).
## Enqueue path
```mermaid
sequenceDiagram
participant Client
participant API as /jobs (POST)
participant DB as Postgres
Client->>API: POST {kind, payload, ...} + Idempotency-Key header
API->>API: validate payload (core::payload::validate)
API->>DB: INSERT ... ON CONFLICT (idempotency_key) DO NOTHING RETURNING *
alt new row
DB-->>API: row
API-->>Client: 201 Created + JobResponse
else key already present
DB-->>API: no rows
API->>DB: SELECT * WHERE idempotency_key = $1
DB-->>API: existing row
API-->>Client: 200 OK + JobResponse
end
```
## Retry policy
Decorrelated-jitter approximation. The next wait is uniformly drawn from `[BASE_MS, min(CAP_MS, BASE_MS · 3^attempt)]` with `BASE_MS = 1_000` and `CAP_MS = 60_000`. This matches the spread shape of the AWS Architecture Blog's decorrelated-jitter sequence without needing to persist the previous wait — bucketing by attempt count is enough.
On a failure, `mark_failed_or_retry`:
- if `attempts >= max_attempts` → status becomes `failed_permanent`, `run_at` unchanged.
- otherwise → status becomes `retrying`, `run_at = now() + backoff(attempts)`.
Workers naturally pick up retries because the dequeue clause includes `run_at <= now()` — no separate scheduler.
## Cancellation
Three-way semantic:
- `queued | retrying`: atomic UPDATE to `cancelled`. Returns `CancelOutcome::CancelledNow` → HTTP 200.
- `running`: set `cancel_requested = TRUE`. The worker re-reads this flag between sub-steps in [src/worker/executor.rs](../src/worker/executor.rs) and transitions to `cancelled` on observation. Returns `CancelOutcome::PendingOnWorker` → HTTP 202.
- terminal (`succeeded | failed_permanent | cancelled`): no-op. Returns `CancelOutcome::AlreadyTerminal(status)` → HTTP 409.
Tokio tasks cannot be safely preempted; cooperative cancellation is the only correct production pattern.
## Graceful shutdown + recovery
On SIGINT/SIGTERM the worker bin flips a `CancellationToken`. The worker loops exit between jobs. An in-flight job that takes longer than the 30 s grace window is hard-aborted; its row stays in `running` with `locked_at` set. On the next startup, [src/worker/recovery.rs](../src/worker/recovery.rs) calls `queue::recover_stale` which resets any `running` row with `locked_at < now() - 5min` back to `retrying`.
This is the production-correct pattern: at no point do we mark a job failed because the worker process exited.
## Observability
- `tracing_subscriber` with JSON formatter when `RUST_LOG_FORMAT=json`, pretty otherwise.
- HTTP layer middleware (in order): `SetRequestIdLayer` → `TraceLayer` (with custom span carrying `request_id`) → `PropagateRequestIdLayer` → 30 s `TimeoutLayer` returning 504.
- Worker spans: `tracing::info_span!("job", job_id, kind, attempt, worker_id)` per claimed job.
- Prometheus metrics via `metrics-exporter-prometheus`. The API binary serves `/metrics` on the same port as its HTTP API (default 8080). The worker binary installs an independent HTTP listener on `WORKER_METRICS_BIND_ADDR` (default `0.0.0.0:9091`) so a single Prometheus scrape config can target each process. Worker emits `worker_jobs_started_total{kind}`, `worker_jobs_completed_total{kind,outcome}`, and `worker_job_duration_seconds{kind,outcome}` (histogram).
## Concurrency test
[tests/concurrency_two_workers.rs](../tests/concurrency_two_workers.rs) creates a separate `processed_log` table, enqueues 200 jobs, spins up 8 independent `WorkerRuntime` instances against the same Postgres, and asserts:
1. All 200 jobs reach `succeeded`.
2. `processed_log` has exactly 200 rows.
3. `processed_log` has exactly 200 **distinct** `job_id` values.
4. Every job has `attempts = 1` (each was claimed exactly once).
5. More than one worker actually participated (sanity: contention was exercised).
This is bounded empirical evidence for the tested schedule and harness. It does not claim global exactly-once semantics for crashes or arbitrary external side effects; duplicate processing in this test would show up as duplicate `job_id` rows in `processed_log`.
## Design decisions
Each subsection here is written in the shape of a PR description: the call that was made, the alternatives considered, and the reason the chosen alternative won.
### Single crate with feature flags (not a workspace of three crates)
**Decision**: one Cargo package, two optional features (`api`, `worker`), three binaries with `required-features` gating.
**Alternatives**: a workspace of three crates (`core` library + `api` binary + `worker` binary). This was the initial direction during the first build phases and was retrofitted.
**Why this won**: a single crate publishes more cleanly to crates.io (one `cargo publish`, one README on the crate page, one set of metadata) while feature flags still keep HTTP and worker-specific dependencies off consumers who only want the queue types. This is the pattern `axum`, `tower-http`, and `tokio` use to publish modular libraries. The cost is that workspace-style separation between layers is sacrificed; the benefit is a smaller surface for consumers and a cleaner publishing story. CI checks the decoupling with three `cargo check --no-default-features --features X` probes.
### Polling-only dequeue (no LISTEN/NOTIFY)
**Decision**: workers poll the queue at 500 ms, backing off to a 2 s ceiling when idle. There is no `pg_notify` channel.
**Alternatives**: a hybrid that uses LISTEN/NOTIFY for low-latency wake-up plus polling as a fallback for missed notifications.
**Why this won**: the additional code path roughly doubles the worker's wakeup-and-dispatch surface for, at most, ~250 ms of latency saving on an idle queue. Under any meaningful load the workers are always behind the queue and polling makes no observable difference. For an intentionally-small artifact, the simpler shape wins. Adding NOTIFY later is a non-breaking change.
### Cooperative cancellation (not preemptive)
**Decision**: `POST /jobs/{id}/cancel` on a running job sets a `cancel_requested = TRUE` flag; the worker observes that flag between sub-steps in [`SimulatedExecutor::execute`] and transitions the row to `cancelled` only when it sees the flag.
**Alternatives**: pre-empt the task with `JoinHandle::abort()` immediately.
**Why this won**: Tokio tasks cannot be safely preempted mid-step. Any cleanup, transactional write, or external side-effect that the executor was about to perform would be lost if the task is killed asynchronously. Cooperative cancellation is the only correct production pattern for an async runtime that doesn't have task-level transactions. The trade-off is observable latency: a job is cancelled at the next sub-step boundary, not instantly.
### 5-minute stale-lock threshold
**Decision**: the recovery sweep only re-queues rows whose `locked_at < now() - interval '5 minutes'`.
**Alternatives**: a shorter threshold (30 s, 1 minute) to recover faster.
**Why this won**: a shorter threshold risks recovering jobs that are actually still being processed by a live worker. The worst case under 5 minutes is a 5-minute stall on a row whose worker truly crashed; the worst case under a 30-second threshold is *duplicate processing* of a row whose worker is currently mid-execution but holding a long external call. Duplicate side-effects are far worse than a 5-minute stall. The threshold is configurable via the `recover_stale_at_startup` argument; production deployments with shorter SLAs can tune it.
### `ON CONFLICT DO NOTHING` for idempotency (not `DO UPDATE`)
**Decision**: when a duplicate `idempotency_key` is observed during `enqueue`, the insert is rejected (`DO NOTHING`); the application then re-fetches the existing row and returns it.
**Alternatives**: `ON CONFLICT (idempotency_key) DO UPDATE SET ...` to merge the new payload into the existing row.
**Why this won**: idempotency means "the same key always produces the same job"; allowing the second call to overwrite the first row violates that contract. The Stripe API convention (same key → same response, regardless of how the body changed) is the standard, and matches user intuition: if I retry a POST with the same Idempotency-Key, I expect to get the original job, not a job that mysteriously changed shape. The cost is one extra round-trip on conflict; the benefit is correctness.
### `Uuid::now_v7` for ids (not v4 or sequence)
**Decision**: every `JobId` is a `Uuid::now_v7` (time-ordered, 128-bit).
**Alternatives**: `Uuid::new_v4` (random) or a `BIGSERIAL` integer column.
**Why this won**:
- vs. v4: v7 is time-ordered, which gives index locality. Insertion patterns tend to hit adjacent pages in the primary-key BTree instead of scattering across the whole index. The expected benefit is lower index churn for high-insert workloads; this repo does not include a v4-vs-v7 write-amplification benchmark.
- vs. `BIGSERIAL`: opaque external ids resist [insecure direct object reference] attacks — a client cannot guess valid job ids by incrementing. They also work across sharded deployments without coordination.
[insecure direct object reference]: https://owasp.org/www-community/attacks/Insecure_Direct_Object_Reference_Prevention_Cheat_Sheet
[`SimulatedExecutor::execute`]: ../src/worker/executor.rs