forge-jobs
Sidekiq-style job queue for Rust with embedded SQLite and pluggable
Postgres. Register handlers, enqueue jobs; the runtime claims / runs /
finalizes them across N worker tasks. The same code path runs
single-process on SQLite (local desktop, CLI tools) or multi-replica
on Postgres (deployed service). Battle-tested against ~2K tickets +
~19K activities for months before breaking out into its own crate.
Install
[]
= "0.1"
# Enable the optional Postgres adapter for deployed multi-replica use:
= { = "0.1", = ["postgres"] }
Pre-publish (using the workspace directly):
[]
= { = "https://github.com/dandush03/forge" }
Status
0.1 — internal API mostly stable. A few naming passes may happen
pre-1.0. Pin a specific version if you want byte-for-byte
reproducibility during this window.
Features
- Two backends, one runtime:
SQLitefor local-first single-process apps; Postgres (--features postgres) for multi-replica deploys withSELECT … FOR UPDATE SKIP LOCKEDclaims andLISTEN/NOTIFYwakeups. - Per-queue worker pools with cooperative shutdown, stale-heartbeat reaper, scheduled-job retention sweep.
- Cron schedules with a lease-elected leader so only one replica fires each tick. The same lease gates the retention sweep and metrics roller, so N pods don't each redundantly delete the same rows.
- Configurable backoff: per-queue exponential curve with a clean
on/off toggle.
backoff_enabled = falsemeans failures retry immediately (bounded bymax_attempts);backoff_enabled = truereadsbase_secondsandmax_secondsfrom the queue's row. - Cancellation:
QueueHandle::request_cancel(&JobId)instant-cancels in-process; cross-replica cancels flow through acancel_requested_atDB flag that the worker's heartbeat tick observes withinHEARTBEAT_INTERVAL. User-cancelled jobs route straight toDead(no retry budget waste). - Cluster-wide rate-limit budget: handlers call
ctx.rate_limit.acquire("slack")(or"gh", or any scope) against a DB-backed token bucket. Two replicas can't both spend the same last token. A real upstream 429 drains the bucket so siblings observe empty on their next acquire instead of each firing their own redundant 429. JobOutcome::Dead(msg)for terminal failures: handlers that can prove a retry would also fail (thread_not_found, 404, deleted upstream resource) skip the retry budget entirely. No more burning five attempts × backoff curve for a permanently-gone resource.
What it doesn't give you
- An HTTP transport — see the sibling crate
forge-jobs-apifor Axum routes + DTOs. - A UI — see
forge-jobs-uifor a Leptos panel that consumes a smallQueueIpctrait. - A built-in paths resolver: you implement the small
QueuePathstrait so the queue stays reusable across hosts. Seeexamples/minimal.rsfor the canonical pattern.
Architecture
Four pieces, all on the storage traits — swap the backend by swapping trait impls, the rest of the crate doesn't change.
┌──────────────┐
│ QueueRuntime │ per-queue supervisor + N worker tasks +
└──────┬───────┘ reaper + cleanup + cron + metrics
│
┌───────────────────┼───────────────────┐
▼ ▼ ▼
┌─────────┐ ┌────────────┐ ┌──────────────────┐
│ JobQueue│ │QueueConfig │ │ RateLimitStorage │ …+ ProcessRegistry,
│ │ │ │ │ │ CronStorage
└─────────┘ └────────────┘ └──────────────────┘
\ | /
\ | /
▼ ▼ ▼
┌──────────────────────┐
│ SqliteStorage / │
│ PostgresStorage │
└──────────────────────┘
Minimal consumer
See examples/minimal.rs. The 30-line
shape:
- Implement
QueuePathsfor your project's paths layer (or use env vars + CWD-relative fallbacks, like the bundledjobs-dbCLI does) DatabaseConfig::load(&paths)?.open_storage(&paths).await?- Build a
HandlerRegistry,registeryour handlers QueueRuntime::new(storage, handlers, router).with_queues([...])— declare the queues this worker consumes (required; orqueues_from_env()to readFORGE_QUEUES). Optionally.with_worker_prefix(...)(+.with_worker_version(...)) for a k8s-style{prefix}-worker-{version}-{id}label on the Workers view; orworker_prefix_from_env()/worker_version_from_env()to readFORGE_WORKER_PREFIX/FORGE_WORKER_VERSION.runtime.ensure_queue("default", N).await?for each queue you'll useruntime.enqueue(req).await?to seed workruntime.start().await?to spawn workers; keep theQueueHandleso you canshutdown_graceful(...)at exit
Handler cancellation contract
Handlers that take longer than ~1 second should periodically check
ctx.cancel.is_cancelled() between .await points. The runtime fires
the cancel token when:
- The user clicks delete on a running job (in-process via
request_cancel, or cross-pod via the DB flag the heartbeat observes) - The supervisor shuts down or scales the queue's worker pool down
A user-initiated cancel routes the job straight to Dead with
"cancelled by user", bypassing the retry budget. A supervisor-
initiated cancel leaves the row in_progress for the reaper.
Pure-Rust handlers that wrap a client.call().await are fine as-is —
at worst one extra upstream call happens after the user clicked
delete. Handlers that loop over paginated upstream results should add
if ctx.cancel.is_cancelled() { return JobOutcome::Failed("cancelled".into()); }
at the loop head.
Backends
SQLite (default)
SqliteStorage opens a WAL-mode file with a single-writer pool +
multi-reader pool. Migrations run idempotently on open. Suitable for
single-process desktop apps; the lease-elected coordinator paths are
no-ops (the lone process always wins).
Postgres (--features postgres)
PostgresStorage uses SELECT … FOR UPDATE SKIP LOCKED for atomic
claims, LISTEN/NOTIFY for instant wake-up on enqueue, and per-host
process registry rows with heartbeat-based liveness. The cron / cleanup
/ metrics loops gate their work behind a cron_leader lease so N pods
don't each redundantly run them.
Configuration
A queue_database.toml file at either <config_dir>/queue_database.toml
(per your QueuePaths impl) or any parent directory of CWD (up to 4
levels) selects the backend. Missing file → SQLite at
<data_dir>/queue.sqlite.
# SQLite (default)
= "sqlite"
# path = "/custom/path/to/queue.sqlite"
# Postgres
= "postgres"
= "db.internal"
= "tech_admin"
= "tech_admin"
= "TECH_ADMIN_DB_PASSWORD" # reads the env var by name
= 30
Admin CLI
cargo run --bin jobs-db -- <create | drop | migrate | reset | status>
Uses JOBS_CONFIG_DIR / JOBS_DATA_DIR env vars for paths (falls
back to ./jobs/config / ./jobs/data when unset).
License
Dual-licensed under either of
- Apache License, Version 2.0 (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0)
- MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT)
at your option. Contributions intentionally submitted for inclusion in this crate shall be dual-licensed as above, without any additional terms or conditions.