Expand description
§Graphile Worker RS
A powerful PostgreSQL-backed job queue for Rust applications, based on Graphile Worker. This is a complete Rust rewrite that offers excellent performance, reliability, and a convenient API.
§Overview
Graphile Worker RS allows you to run jobs (such as sending emails, performing calculations, generating PDFs) in the background, so your HTTP responses and application code remain fast and responsive. It’s ideal for any PostgreSQL-backed Rust application.
Key highlights:
- High performance: Uses PostgreSQL’s
SKIP LOCKEDfor efficient job fetching - Low latency: Typically under 3ms from task schedule to execution using
LISTEN/NOTIFY - Reliable: Automatically retries failed jobs with exponential backoff
- Flexible: Supports scheduled jobs, task queues, cron-like recurring tasks, and more
- Type-safe: Uses Rust’s type system to ensure job payloads match their handlers
§Differences from Node.js version
This port is mostly compatible with the original Graphile Worker, meaning you can run it side by side with the Node.js version. The key differences are:
- No support for batch job processing yet (processing array payloads within a single job). Note: batch job scheduling (adding multiple jobs at once) is supported - see “Batch job scheduling” section.
- In the Node.js version, each process has its own worker_id. In the Rust version, there is only one worker_id, and jobs are processed in your async runtime thread
§Installation
Add the library to your project:
cargo add graphile_worker§Getting Started
§1. Define a Task
A task consists of a struct that implements the TaskHandler trait. Each task has:
- A struct with
Serialize/Deserializefor the payload - A unique identifier string
- An async
runmethod that contains the task’s logic
use serde::{Deserialize, Serialize};
use graphile_worker::{WorkerContext, TaskHandler, IntoTaskHandlerResult};
#[derive(Deserialize, Serialize)]
struct SendEmail {
to: String,
subject: String,
body: String,
}
impl TaskHandler for SendEmail {
const IDENTIFIER: &'static str = "send_email";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
println!("Sending email to {} with subject '{}'", self.to, self.subject);
// Email sending logic would go here
Ok::<(), String>(())
}
}§2. Configure and Run the Worker
Set up the worker with your configuration options and run it:
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create a PostgreSQL connection pool
let pg_pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(5)
.connect("postgres://postgres:password@localhost/mydb")
.await?;
// Initialize and run the worker
graphile_worker::WorkerOptions::default()
.concurrency(5) // Process up to 5 jobs concurrently
.schema("graphile_worker") // Use this PostgreSQL schema
.define_job::<SendEmail>() // Register the task handler
.pg_pool(pg_pool) // Provide the database connection
.init() // Initialize the worker
.await?
.run() // Start processing jobs
.await?;
Ok(())
}§Custom shutdown handling
Graphile Worker installs OS-level signal handlers (like SIGINT/SIGTERM) so
it can shut down gracefully when you press Ctrl+C. If your application already
owns the shutdown lifecycle, disable the built-in listeners and call
Worker::request_shutdown() when your orchestrator asks the worker to stop:
let worker = graphile_worker::WorkerOptions::default()
.listen_os_shutdown_signals(false) // prevent installing Ctrl+C handlers
// ... other configuration
.init()
.await?;
tokio::pin! {
let run_loop = worker.run();
}
tokio::select! {
// Main worker loop
result = &mut run_loop => result?,
// Notify the worker when the host framework wants to stop
() = on_shutdown() => {
worker.request_shutdown();
(&mut run_loop).await; // drain gracefully before returning
}
}§3. Schedule Jobs
§Option A: Schedule a job via SQL
Connect to your database and run the following SQL:
SELECT graphile_worker.add_job(
'send_email',
json_build_object(
'to', 'user@example.com',
'subject', 'Welcome to our app!',
'body', 'Thanks for signing up.'
)
);§Option B: Schedule a job from Rust
// Get a WorkerUtils instance to manage jobs
let utils = worker.create_utils();
// Type-safe method (recommended):
utils.add_job(
SendEmail {
to: "user@example.com".to_string(),
subject: "Welcome to our app!".to_string(),
body: "Thanks for signing up.".to_string(),
},
Default::default(), // Use default job options
).await?;
// Or use the raw method when type isn't available:
utils.add_raw_job(
"send_email",
serde_json::json!({
"to": "user@example.com",
"subject": "Welcome to our app!",
"body": "Thanks for signing up."
}),
Default::default(),
).await?;§Option C: Batch job scheduling
For efficiency when adding many jobs at once, use batch methods:
// Batch add jobs of the same type (type-safe)
let spec = JobSpec::default();
utils.add_jobs::<SendEmail>(&[
(SendEmail { to: "user1@example.com".into(), subject: "Hello".into(), body: "...".into() }, &spec),
(SendEmail { to: "user2@example.com".into(), subject: "Hello".into(), body: "...".into() }, &spec),
(SendEmail { to: "user3@example.com".into(), subject: "Hello".into(), body: "...".into() }, &spec),
]).await?;
// Batch add jobs of different types (dynamic)
utils.add_raw_jobs(&[
RawJobSpec {
identifier: "send_email".into(),
payload: serde_json::json!({ "to": "user@example.com", "subject": "Hi" }),
spec: JobSpec::default(),
},
RawJobSpec {
identifier: "process_payment".into(),
payload: serde_json::json!({ "user_id": 123, "amount": 50 }),
spec: JobSpec::default(),
},
]).await?;§Advanced Features
§Shared Application State
You can provide shared state to all your tasks using extensions:
use serde::{Deserialize, Serialize};
use graphile_worker::{WorkerContext, TaskHandler, IntoTaskHandlerResult};
use std::sync::{Arc, atomic::{AtomicUsize, Ordering::SeqCst}};
// Define your shared state
#[derive(Clone, Debug)]
struct AppState {
db_client: Arc<DatabaseClient>,
api_key: String,
counter: Arc<AtomicUsize>,
}
// Example database client (just for demonstration)
struct DatabaseClient;
impl DatabaseClient {
fn new() -> Self { Self }
async fn find_user(&self, _user_id: &str) -> Result<(), String> { Ok(()) }
}
#[derive(Deserialize, Serialize)]
struct ProcessUserTask {
user_id: String,
}
impl TaskHandler for ProcessUserTask {
const IDENTIFIER: &'static str = "process_user";
async fn run(self, ctx: WorkerContext) -> impl IntoTaskHandlerResult {
// Access the shared state in your task
let app_state = ctx.get_ext::<AppState>().unwrap();
let count = app_state.counter.fetch_add(1, SeqCst);
// Use shared resources
app_state.db_client.find_user(&self.user_id).await?;
println!("Processed user {}, task count: {}", self.user_id, count);
Ok::<(), String>(())
}
}
// Add the extension when configuring the worker
let app_state = AppState {
db_client: Arc::new(DatabaseClient::new()),
api_key: "secret_key".to_string(),
counter: Arc::new(AtomicUsize::new(0)),
};
graphile_worker::WorkerOptions::default()
.add_extension(app_state)
.define_job::<ProcessUserTask>()
// ... other configuration
.init()
.await?;§Scheduling Options
You can customize how and when jobs run with the JobSpec builder:
use graphile_worker::{JobSpecBuilder, JobKeyMode};
use chrono::Utc;
// Schedule a job to run after 5 minutes with high priority
let job_spec = JobSpecBuilder::new()
.run_at(Utc::now() + chrono::Duration::minutes(5))
.priority(10)
.job_key("welcome_email_user_123") // Unique identifier for deduplication
.job_key_mode(JobKeyMode::Replace) // Replace existing jobs with this key
.max_attempts(5) // Max retry attempts (default is 25)
.build();
utils.add_job(SendEmail { /* ... */ }, job_spec).await?;§Job Queues
Jobs with the same queue name run in series (one after another) rather than in parallel:
// These jobs will run one after another, not concurrently
let spec1 = JobSpecBuilder::new()
.queue_name("user_123_operations")
.build();
let spec2 = JobSpecBuilder::new()
.queue_name("user_123_operations")
.build();
utils.add_job(UpdateProfile { /* ... */ }, spec1).await?;
utils.add_job(SendEmail { /* ... */ }, spec2).await?;§Cron Jobs
You can schedule recurring jobs using crontab syntax:
// Run a task daily at 8:00 AM
let worker = WorkerOptions::default()
.with_crontab("0 8 * * * send_daily_report")?
.define_job::<SendDailyReport>()
// ... other configuration
.init()
.await?;§Local Queue
The Local Queue feature batch-fetches jobs from the database and caches them locally, significantly reducing database load in high-throughput scenarios.
use graphile_worker::{WorkerOptions, LocalQueueConfig, RefetchDelayConfig};
use std::time::Duration;
let worker = WorkerOptions::default()
.local_queue(
LocalQueueConfig::default()
.with_size(100) // Cache up to 100 jobs
.with_ttl(Duration::from_secs(300)) // Return unclaimed jobs after 5 minutes
.with_refetch_delay(
RefetchDelayConfig::default()
.with_duration(Duration::from_millis(100)) // Delay between refetches
.with_threshold(10) // Refetch when queue drops below 10
)
)
.define_job::<SendEmail>()
.pg_pool(pg_pool)
.init()
.await?;The Local Queue operates in several modes:
- Polling: Actively fetching jobs from the database
- Waiting: Jobs are cached locally, serving from cache
- TtlExpired: Cache TTL expired, returning jobs to database
Key benefits:
- Reduces database round-trips by fetching jobs in batches
- Configurable cache size and TTL
- Automatic return of unclaimed jobs on shutdown or TTL expiry
- Refetch delay prevents thundering herd on empty queues
§Lifecycle Hooks
You can observe and intercept job lifecycle events using plugins that implement the Plugin trait. This is useful for logging, metrics, validation, and custom job handling logic.
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use graphile_worker::{
Plugin, HookRegistry, HookResult,
WorkerStart, JobStart, JobComplete, JobFail, BeforeJobRun,
};
struct MetricsPlugin {
jobs_started: AtomicU64,
jobs_completed: AtomicU64,
}
impl Plugin for MetricsPlugin {
fn register(self, hooks: &mut HookRegistry) {
hooks.on(WorkerStart, async |ctx| {
println!("Worker {} started", ctx.worker_id);
});
let jobs_started = Arc::new(self.jobs_started);
let jobs_completed = Arc::new(self.jobs_completed);
{
let jobs_started = jobs_started.clone();
hooks.on(JobStart, move |ctx| {
let jobs_started = jobs_started.clone();
async move {
jobs_started.fetch_add(1, Ordering::Relaxed);
println!("Job {} started", ctx.job.id());
}
});
}
{
let jobs_completed = jobs_completed.clone();
hooks.on(JobComplete, move |ctx| {
let jobs_completed = jobs_completed.clone();
async move {
jobs_completed.fetch_add(1, Ordering::Relaxed);
println!("Job {} completed in {:?}", ctx.job.id(), ctx.duration);
}
});
}
hooks.on(JobFail, async |ctx| {
println!("Job {} failed: {}", ctx.job.id(), ctx.error);
});
}
}§Intercepting Jobs
The BeforeJobRun and AfterJobRun hooks can intercept jobs and change their behavior:
struct ValidationPlugin;
impl Plugin for ValidationPlugin {
fn register(self, hooks: &mut HookRegistry) {
hooks.on(BeforeJobRun, async |ctx| {
// Skip jobs with a "skip" flag in their payload
if ctx.payload.get("skip").and_then(|v| v.as_bool()).unwrap_or(false) {
return HookResult::Skip;
}
// Fail jobs with invalid data
if ctx.payload.get("invalid").is_some() {
return HookResult::Fail("Invalid payload".into());
}
// Continue with normal execution
HookResult::Continue
});
}
}§Registering Plugins
Add plugins when configuring the worker:
let worker = WorkerOptions::default()
.define_job::<SendEmail>()
.add_plugin(MetricsPlugin::new())
.add_plugin(ValidationPlugin)
.pg_pool(pg_pool)
.init()
.await?;Multiple plugins can be registered and they will all receive hook calls in the order they were added.
§Available Hooks
| Hook | Type | Description |
|---|---|---|
WorkerInit | Observer | Called when worker is initializing |
WorkerStart | Observer | Called when worker starts processing |
WorkerShutdown | Observer | Called when worker is shutting down |
JobFetch | Observer | Called when a job is fetched from the queue |
JobStart | Observer | Called before a job starts executing |
JobComplete | Observer | Called after a job completes successfully |
JobFail | Observer | Called when a job fails (will retry) |
JobPermanentlyFail | Observer | Called when a job exceeds max attempts |
CronTick | Observer | Called on each cron scheduler tick |
CronJobScheduled | Observer | Called when a cron job is scheduled |
LocalQueueInit | Observer | Called when local queue is initialized |
LocalQueueSetMode | Observer | Called when local queue changes mode |
LocalQueueGetJobsComplete | Observer | Called after batch fetching jobs |
LocalQueueReturnJobs | Observer | Called when jobs are returned to database |
LocalQueueRefetchDelayStart | Observer | Called when refetch delay starts |
LocalQueueRefetchDelayAbort | Observer | Called when refetch delay is aborted |
LocalQueueRefetchDelayExpired | Observer | Called when refetch delay expires |
BeforeJobRun | Interceptor | Can skip, fail, or continue job execution |
AfterJobRun | Interceptor | Can modify the job result after execution |
BeforeJobSchedule | Interceptor | Can skip, fail, or transform job before scheduling |
§Job Management Utilities
The WorkerUtils class provides methods for managing jobs:
// Get a WorkerUtils instance
let utils = worker.create_utils();
// Remove a job by its key
utils.remove_job("job_key_123").await?;
// Mark jobs as completed
utils.complete_jobs(&[job_id1, job_id2]).await?;
// Permanently fail jobs with a reason
utils.permanently_fail_jobs(&[job_id3, job_id4], "Invalid data").await?;
// Reschedule jobs
let options = RescheduleJobOptions {
run_at: Some(Utc::now() + chrono::Duration::minutes(60)),
priority: Some(5),
max_attempts: Some(3),
..Default::default()
};
utils.reschedule_jobs(&[job_id5, job_id6], options).await?;
// Run database cleanup tasks
utils.cleanup(&[
CleanupTask::DeletePermenantlyFailedJobs,
CleanupTask::GcTaskIdentifiers,
CleanupTask::GcJobQueues,
]).await?;§Feature List
- Flexible deployment: Run standalone or embedded in your application
- Multi-language support: Use from Rust, SQL or alongside the Node.js version
- Performance optimized:
- Low latency job execution (typically under 3ms)
- PostgreSQL
LISTEN/NOTIFYfor immediate job notifications SKIP LOCKEDfor efficient job fetching
- Robust job processing:
- Parallel processing with customizable concurrency
- Serialized execution via named queues
- Automatic retries with exponential backoff
- Customizable retry counts (default: 25 attempts over ~3 days)
- Scheduling features:
- Delayed execution with
run_at - Job prioritization
- Crontab-like recurring tasks
- Task deduplication via
job_key
- Delayed execution with
- Lifecycle hooks: Observe and intercept job events for logging, metrics, and validation
- Type safety: End-to-end type checking of job payloads
- Minimal overhead: Direct serialization of task payloads
§Requirements
- PostgreSQL 12+
- Required for the
generated always as (expression)feature - May work with older versions but has not been tested
- Required for the
§Project Status
Production ready but the API may continue to evolve. If you encounter any issues or have feature requests, please open an issue on GitHub.
§Acknowledgments
This library is a Rust port of the excellent Graphile Worker by Benjie Gillam. If you find this library useful, please consider sponsoring Benjie’s work, as all the research and architecture design was done by him.
§License
MIT License - See LICENSE.md
§Graphile Worker RS
A PostgreSQL-backed job queue implementation for Rust applications. This crate is a Rust port of the Node.js Graphile Worker library.
§Architecture Overview
Graphile Worker uses PostgreSQL as its backend for job storage and coordination. The system consists of several key components:
- Worker: Processes jobs from the queue using the specified concurrency.
- WorkerUtils: Utility functions for job management (adding, removing, rescheduling, etc.).
- TaskHandler: Trait that defines how specific job types are processed.
- Job Specification: Configures job parameters like priority, retry behavior, and scheduling.
- Migrations: Automatic schema management for the database tables.
§Database Schema
Graphile Worker manages its own database schema (default: graphile_worker).
It automatically handles migrations and uses the following tables:
_private_jobs: Stores job data, state, and execution metadata_private_tasks: Tracks registered task types_private_job_queues: Manages job queue names for serialized job execution_private_workers: Tracks active worker instances
§Module Structure
The crate is organized into the following modules:
Re-exports§
pub use builder::WorkerBuildError;pub use builder::WorkerOptions;pub use context_ext::WorkerContextExt;pub use local_queue::LocalQueue;pub use local_queue::LocalQueueConfig;pub use local_queue::LocalQueueError;pub use local_queue::RefetchDelayConfig;pub use runner::Worker;pub use sql::add_job::RawJobSpec;pub use worker_utils::WorkerUtils;pub use crate::job_spec::*;
Modules§
- builder
- Configuration and initialization of worker instances
- context_
ext - errors
- Error types used throughout the crate
- job_
spec - Job specification and builder for configuring jobs
- local_
queue - LocalQueue for batch-fetching jobs to improve throughput
- runner
- Core worker implementation for running the job queue
- sql
- SQL query implementations for interacting with the database
- streams
- Job stream management for processing jobs
- utils
- General utility functions
- worker_
utils - Utility functions for job management
Structs§
- After
JobRun - After
JobRun Context - Before
JobRun - Before
JobRun Context - Before
JobSchedule - Before
JobSchedule Context - Cron
JobScheduled - Cron
JobScheduled Context - Cron
Tick - Cron
Tick Context - DbJob
DbJobrepresents a job as stored in the database.- Hook
Registry - Job
JobextendsDbJobwith an additional task_identifier field.- JobBuilder
- Builder for
Job. - JobComplete
- JobComplete
Context - JobFail
- JobFail
Context - JobFetch
- JobFetch
Context - JobPermanently
Fail - JobPermanently
Fail Context - JobStart
- JobStart
Context - Local
Queue GetJobs Complete - Local
Queue GetJobs Complete Context - Local
Queue Init - Local
Queue Init Context - Local
Queue Refetch Delay Abort - Local
Queue Refetch Delay Abort Context - Local
Queue Refetch Delay Expired - Local
Queue Refetch Delay Expired Context - Local
Queue Refetch Delay Start - Local
Queue Refetch Delay Start Context - Local
Queue Return Jobs - Local
Queue Return Jobs Context - Local
Queue SetMode - Local
Queue SetMode Context - Shared
Task Details - Task
Details - Worker
Context - Context provided to task handlers when processing a job.
- Worker
Context Builder - Builder for
WorkerContext. - Worker
Init - Worker
Init Context - Worker
Shutdown - Worker
Shutdown Context - Worker
Start - Worker
Start Context
Enums§
- Hook
Result - JobBuilder
Error - Error type for JobBuilder
- JobSchedule
Result - Local
Queue Mode - Shutdown
Reason - Worker
Context Builder Error - Error type for WorkerContextBuilder
Traits§
- Event
- Hook
Output - Interceptable
- Into
Task Handler Result - Trait for converting task handler return types into a standardized Result.
- Plugin
- Task
Handler - Core trait for defining task handlers in Graphile Worker.
Functions§
- parse_
crontab - Parse a crontab definition into a Vec of crontab
- run_
task_ from_ worker_ ctx - Internal function to execute a task handler from a worker context.