Graphile Worker RS
A powerful PostgreSQL-backed job queue for Rust applications, based on Graphile Worker. This is a complete Rust rewrite that offers excellent performance, reliability, and a convenient API.
Overview
Graphile Worker RS allows you to run jobs (such as sending emails, performing calculations, generating PDFs) in the background, so your HTTP responses and application code remain fast and responsive. It's ideal for any PostgreSQL-backed Rust application.
Key highlights:
- High performance: Uses PostgreSQL's
SKIP LOCKEDfor efficient job fetching - Low latency: Typically under 3ms from task schedule to execution using
LISTEN/NOTIFY - Reliable: Automatically retries failed jobs with exponential backoff
- Flexible: Supports scheduled jobs, task queues, cron-like recurring tasks, and more
- Type-safe: Uses Rust's type system to ensure job payloads match their handlers
Differences from Node.js version
This port is mostly compatible with the original Graphile Worker, meaning you can run it side by side with the Node.js version. The key differences are:
- In the Node.js version, each process has its own worker_id. In the Rust version, there is only one worker_id, and jobs are processed in your async runtime thread
Installation
Add the library to your project:
Tokio is the default runtime:
= { = "0.11.4", = ["tls-rustls"] }
= { = "1", = ["macros", "rt-multi-thread"] }
= "0.3"
To use async-std instead, disable default features and enable runtime-async-std.
Applications using #[async_std::main] also need async-std's attributes
feature:
= { = "0.11.4", = false, = ["runtime-async-std", "tls-rustls"] }
= { = "1", = ["attributes"] }
= "0.3"
Getting Started
1. Define a Task
A task consists of a struct that implements the TaskHandler trait. Each task has:
- A struct with
Serialize/Deserializefor the payload - A unique identifier string
- An async
runmethod that contains the task's logic
use ;
use ;
2. Configure and Run the Worker
Set up the worker with your configuration options and run it. Use the entrypoint for the runtime you enabled:
async
async
The worker setup itself is runtime-neutral:
async
For larger applications, modules can expose their jobs as reusable definitions and register them together:
use ;
default
.define_jobs
// ... other configuration
.init
.await?;
Custom shutdown handling
Graphile Worker installs OS-level signal handlers (like SIGINT/SIGTERM) so
it can shut down gracefully when you press Ctrl+C. If your application already
owns the shutdown lifecycle, disable the built-in listeners and call
Worker::request_shutdown() when your orchestrator asks the worker to stop:
use FutureExt;
let worker = default
.listen_os_shutdown_signals // prevent installing Ctrl+C handlers
// ... other configuration
.init
.await?;
let run_loop = worker.run.fuse;
let shutdown = on_shutdown.fuse;
pin_mut!;
select_biased!
3. Schedule Jobs
Option A: Schedule a job via SQL
Connect to your database and run the following SQL:
SELECT graphile_worker.add_job(
'send_email',
json_build_object(
'to', 'user@example.com',
'subject', 'Welcome to our app!',
'body', 'Thanks for signing up.'
)
);
Option B: Schedule a job from Rust
// Get a WorkerUtils instance to manage jobs
let utils = worker.create_utils;
// Type-safe method (recommended):
utils.add_job.await?;
// Or use the raw method when type isn't available:
utils.add_raw_job.await?;
Option C: Batch job scheduling
For efficiency when adding many jobs at once, use batch methods:
// Batch add jobs of the same type (type-safe)
let spec = default;
utils..await?;
// Batch add jobs of different types (dynamic)
utils.add_raw_jobs.await?;
Option D: Batch job processing
Batch jobs store a JSON array in a single job. Batch handlers can return per-item results, allowing the worker to remove successful items and retry only the failed items.
use ;
use ;
let worker = default
.
// ... other configuration
.init
.await?;
worker.create_utils
.add_batch_job
.await?;
Advanced Features
Shared Application State
You can provide shared state to all your tasks using extensions:
use ;
use ;
use ;
// Define your shared state
// Example database client (just for demonstration)
;
// Add the extension when configuring the worker
let app_state = AppState ;
default
.add_extension
.
// ... other configuration
.init
.await?;
Scheduling Options
You can customize how and when jobs run with the JobSpec builder:
use ;
use Utc;
// Schedule a job to run after 5 minutes with high priority
let job_spec = new
.run_at
.priority
.job_key // Unique identifier for deduplication
.job_key_mode // Replace existing jobs with this key
.max_attempts // Max retry attempts (default is 25)
.build;
utils.add_job.await?;
Job Queues
Jobs with the same queue name run in series (one after another) rather than in parallel:
// These jobs will run one after another, not concurrently
let spec1 = new
.queue_name
.build;
let spec2 = new
.queue_name
.build;
utils.add_job.await?;
utils.add_job.await?;
Cron Jobs
You can schedule recurring jobs with the typed cron API:
use ;
// Run a task daily at 8:00 AM UTC, with one hour of backfill.
let worker = default
.
.with_cron
// ... other configuration
.init
.await?;
Crontab text is still supported when you need Graphile Worker's file-style syntax:
let worker = default
.
.with_cron?
// ... other configuration
.init
.await?;
with_crontab(...) remains available as a deprecated compatibility alias.
Local Queue
The Local Queue feature batch-fetches jobs from the database and caches them locally, significantly reducing database load in high-throughput scenarios.
use ;
use Duration;
let worker = default
.local_queue
.
.pg_pool
.init
.await?;
The Local Queue operates in several modes:
- Polling: Actively fetching jobs from the database
- Waiting: Jobs are cached locally, serving from cache
- TtlExpired: Cache TTL expired, returning jobs to database
Key benefits:
- Reduces database round-trips by fetching jobs in batches
- Configurable cache size and TTL
- Automatic return of unclaimed jobs on shutdown or TTL expiry
- Refetch delay prevents thundering herd on empty queues
Lifecycle Hooks
You can observe and intercept job lifecycle events using plugins that implement the Plugin trait. This is useful for logging, metrics, validation, and custom job handling logic.
use Arc;
use ;
use ;
Intercepting Jobs
The BeforeJobRun and AfterJobRun hooks can intercept jobs and change their behavior:
;
Registering Plugins
Add plugins when configuring the worker:
let worker = default
.
.add_plugin
.add_plugin
.pg_pool
.init
.await?;
Multiple plugins can be registered and they will all receive hook calls in the order they were added.
Available Hooks
| Hook | Type | Description |
|---|---|---|
WorkerInit |
Observer | Called when worker is initializing |
WorkerStart |
Observer | Called when worker starts processing |
WorkerShutdown |
Observer | Called when worker is shutting down |
JobFetch |
Observer | Called when a job is fetched from the queue |
JobStart |
Observer | Called before a job starts executing |
JobComplete |
Observer | Called after a job completes successfully |
JobFail |
Observer | Called when a job fails (will retry) |
JobPermanentlyFail |
Observer | Called when a job exceeds max attempts |
CronTick |
Observer | Called on each cron scheduler tick |
CronJobScheduled |
Observer | Called when a cron job is scheduled |
LocalQueueInit |
Observer | Called when local queue is initialized |
LocalQueueSetMode |
Observer | Called when local queue changes mode |
LocalQueueGetJobsComplete |
Observer | Called after batch fetching jobs |
LocalQueueReturnJobs |
Observer | Called when jobs are returned to database |
LocalQueueRefetchDelayStart |
Observer | Called when refetch delay starts |
LocalQueueRefetchDelayAbort |
Observer | Called when refetch delay is aborted |
LocalQueueRefetchDelayExpired |
Observer | Called when refetch delay expires |
BeforeJobRun |
Interceptor | Can skip, fail, or continue job execution |
AfterJobRun |
Interceptor | Can modify the job result after execution |
BeforeJobSchedule |
Interceptor | Can skip, fail, or transform job before scheduling |
Job Management Utilities
The WorkerUtils class provides methods for managing jobs:
// Get a WorkerUtils instance
let utils = worker.create_utils;
// Remove a job by its key
utils.remove_job.await?;
// Mark jobs as completed
utils.complete_jobs.await?;
// Permanently fail jobs with a reason
utils.permanently_fail_jobs.await?;
// Reschedule jobs
let options = RescheduleJobOptions ;
utils.reschedule_jobs.await?;
// Run database cleanup tasks
utils.cleanup.await?;
CLI
The workspace includes a dedicated graphile_worker_cli crate with a
graphile-worker binary for managing jobs from a terminal.
DATABASE_URL=postgres://postgres:postgres@localhost/postgres
Use --schema or GRAPHILE_WORKER_SCHEMA for non-default schemas, and
--json when machine-readable output is preferred.
The CLI can also serve the embedded Leptos admin UI. It serves its HTML, API, Leptos/WASM client bundle, Tailwind CSS, and icon assets from the same binary. HTTP Basic auth is the default; when no password is configured, the CLI prints a random password at startup.
The crate ships prebuilt admin UI assets, so cargo install graphile_worker_cli
does not require npm, wasm-bindgen, or the wasm32-unknown-unknown Rust
target. Maintainers can force a source rebuild with
GRAPHILE_WORKER_ADMIN_UI_REBUILD=1; that path needs those tools. Use
GRAPHILE_WORKER_ADMIN_UI_UPDATE_PREBUILT=1 when intentionally refreshing the
checked-in prebuilt assets.
DATABASE_URL=postgres://postgres:postgres@localhost/postgres
The admin UI supports dashboard stats, job list filtering, pasted multi-value column filters, row selection, copy JSON/CSV actions, job add/complete/fail/run now/reschedule/remove-by-key actions, queue and worker views, force unlock, cleanup, migrations, theming, and dark mode.
Feature List
- Flexible deployment: Run standalone or embedded in your application
- Multi-language support: Use from Rust, SQL or alongside the Node.js version
- Performance optimized:
- Low latency job execution (typically under 3ms)
- PostgreSQL
LISTEN/NOTIFYfor immediate job notifications SKIP LOCKEDfor efficient job fetching
- Robust job processing:
- Parallel processing with customizable concurrency
- Serialized execution via named queues
- Automatic retries with exponential backoff
- Customizable retry counts (default: 25 attempts over ~3 days)
- Scheduling features:
- Delayed execution with
run_at - Job prioritization
- Crontab-like recurring tasks
- Task deduplication via
job_key - Batch job processing with partial retry support
- Delayed execution with
- Lifecycle hooks: Observe and intercept job events for logging, metrics, and validation
- Type safety: End-to-end type checking of job payloads
- Minimal overhead: Direct serialization of task payloads
Requirements
- PostgreSQL 12+
- Required for the
generated always as (expression)feature - May work with older versions but has not been tested
- Required for the
Project Status
Production ready but the API may continue to evolve. If you encounter any issues or have feature requests, please open an issue on GitHub.
Acknowledgments
This library is a Rust port of the excellent Graphile Worker by Benjie Gillam. If you find this library useful, please consider sponsoring Benjie's work, as all the research and architecture design was done by him.
License
MIT License - See LICENSE.md