Skip to main content

Crate graphile_worker

Crate graphile_worker 

Source
Expand description

§Graphile Worker RS

Codecov Crates.io Documentation MIT License

A powerful PostgreSQL-backed job queue for Rust applications, based on Graphile Worker. This is a complete Rust rewrite that offers excellent performance, reliability, and a convenient API.

§Overview

Graphile Worker RS allows you to run jobs (such as sending emails, performing calculations, generating PDFs) in the background, so your HTTP responses and application code remain fast and responsive. It’s ideal for any PostgreSQL-backed Rust application.

Key highlights:

  • High performance: Uses PostgreSQL’s SKIP LOCKED for efficient job fetching
  • Low latency: Typically under 3ms from task schedule to execution using LISTEN/NOTIFY
  • Reliable: Automatically retries failed jobs with exponential backoff
  • Flexible: Supports scheduled jobs, task queues, cron-like recurring tasks, and more
  • Type-safe: Uses Rust’s type system to ensure job payloads match their handlers

§Differences from Node.js version

This port is mostly compatible with the original Graphile Worker, meaning you can run it side by side with the Node.js version. The key differences are:

  • In the Node.js version, each process has its own worker_id. In the Rust version, there is only one worker_id, and jobs are processed in your async runtime thread

§Installation

Add the library to your project:

cargo add graphile_worker

Tokio is the default runtime:

graphile_worker = { version = "0.11.4", features = ["tls-rustls"] }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
futures = "0.3"

To use async-std instead, disable default features and enable runtime-async-std. Applications using #[async_std::main] also need async-std’s attributes feature:

graphile_worker = { version = "0.11.4", default-features = false, features = ["runtime-async-std", "tls-rustls"] }
async-std = { version = "1", features = ["attributes"] }
futures = "0.3"

§Getting Started

§1. Define a Task

A task consists of a struct that implements the TaskHandler trait. Each task has:

  • A struct with Serialize/Deserialize for the payload
  • A unique identifier string
  • An async run method that contains the task’s logic
use serde::{Deserialize, Serialize};
use graphile_worker::{WorkerContext, TaskHandler, IntoTaskHandlerResult};

#[derive(Deserialize, Serialize)]
struct SendEmail {
    to: String,
    subject: String,
    body: String,
}

impl TaskHandler for SendEmail {
    const IDENTIFIER: &'static str = "send_email";

    async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
        println!("Sending email to {} with subject '{}'", self.to, self.subject);
        // Email sending logic would go here
        Ok::<(), String>(())
    }
}

§2. Configure and Run the Worker

Set up the worker with your configuration options and run it. Use the entrypoint for the runtime you enabled:

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    run_worker().await
}
#[async_std::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    run_worker().await
}

The worker setup itself is runtime-neutral:

async fn run_worker() -> Result<(), Box<dyn std::error::Error>> {
    // Create a PostgreSQL connection pool
    let pg_pool = sqlx::postgres::PgPoolOptions::new()
        .max_connections(5)
        .connect("postgres://postgres:password@localhost/mydb")
        .await?;

    // Initialize and run the worker
    graphile_worker::WorkerOptions::default()
        .concurrency(5)                 // Process up to 5 jobs concurrently
        .schema("graphile_worker")      // Use this PostgreSQL schema
        .define_job::<SendEmail>()      // Register the task handler
        .pg_pool(pg_pool)               // Provide the database connection
        .init()                         // Initialize the worker
        .await?
        .run()                          // Start processing jobs
        .await?;

    Ok(())
}

For larger applications, modules can expose their jobs as reusable definitions and register them together:

use graphile_worker::{JobDefinition, TaskHandler};

pub fn jobs() -> [JobDefinition; 2] {
    [
        SendEmail::definition(),
        SendDailyReport::definition(),
    ]
}

graphile_worker::WorkerOptions::default()
    .define_jobs(jobs())
    // ... other configuration
    .init()
    .await?;
§Custom shutdown handling

Graphile Worker installs OS-level signal handlers (like SIGINT/SIGTERM) so it can shut down gracefully when you press Ctrl+C. If your application already owns the shutdown lifecycle, disable the built-in listeners and call Worker::request_shutdown() when your orchestrator asks the worker to stop:

use futures::FutureExt;

let worker = graphile_worker::WorkerOptions::default()
    .listen_os_shutdown_signals(false) // prevent installing Ctrl+C handlers
    // ... other configuration
    .init()
    .await?;

let run_loop = worker.run().fuse();
let shutdown = on_shutdown().fuse();
futures::pin_mut!(run_loop, shutdown);

futures::select_biased! {
    // Main worker loop
    result = run_loop => result?,
    // Notify the worker when the host framework wants to stop
    () = shutdown => {
        worker.request_shutdown();
        run_loop.await?; // drain gracefully before returning
    }
}

§3. Schedule Jobs

§Option A: Schedule a job via SQL

Connect to your database and run the following SQL:

SELECT graphile_worker.add_job(
    'send_email',
    json_build_object(
        'to', 'user@example.com',
        'subject', 'Welcome to our app!',
        'body', 'Thanks for signing up.'
    )
);
§Option B: Schedule a job from Rust
// Get a WorkerUtils instance to manage jobs
let utils = worker.create_utils();

// Type-safe method (recommended):
utils.add_job(
    SendEmail {
        to: "user@example.com".to_string(),
        subject: "Welcome to our app!".to_string(),
        body: "Thanks for signing up.".to_string(),
    },
    Default::default(), // Use default job options
).await?;

// Or use the raw method when type isn't available:
utils.add_raw_job(
    "send_email",
    serde_json::json!({
        "to": "user@example.com",
        "subject": "Welcome to our app!",
        "body": "Thanks for signing up."
    }),
    Default::default(),
).await?;
§Option C: Batch job scheduling

For efficiency when adding many jobs at once, use batch methods:

// Batch add jobs of the same type (type-safe)
let spec = JobSpec::default();
utils.add_jobs::<SendEmail>(&[
    (SendEmail { to: "user1@example.com".into(), subject: "Hello".into(), body: "...".into() }, &spec),
    (SendEmail { to: "user2@example.com".into(), subject: "Hello".into(), body: "...".into() }, &spec),
    (SendEmail { to: "user3@example.com".into(), subject: "Hello".into(), body: "...".into() }, &spec),
]).await?;

// Batch add jobs of different types (dynamic)
utils.add_raw_jobs(&[
    RawJobSpec {
        identifier: "send_email".into(),
        payload: serde_json::json!({ "to": "user@example.com", "subject": "Hi" }),
        spec: JobSpec::default(),
    },
    RawJobSpec {
        identifier: "process_payment".into(),
        payload: serde_json::json!({ "user_id": 123, "amount": 50 }),
        spec: JobSpec::default(),
    },
]).await?;
§Option D: Batch job processing

Batch jobs store a JSON array in a single job. Batch handlers can return per-item results, allowing the worker to remove successful items and retry only the failed items.

use graphile_worker::{
    BatchTaskHandler, IntoBatchTaskHandlerResult, JobKeyMode, JobSpecBuilder, WorkerContext,
    WorkerOptions,
};
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
struct PendingNotification {
    user_id: String,
    message_id: String,
}

impl BatchTaskHandler for PendingNotification {
    const IDENTIFIER: &'static str = "send_notifications";

    async fn run_batch(
        items: Vec<Self>,
        _ctx: WorkerContext,
    ) -> impl IntoBatchTaskHandlerResult {
        futures::future::join_all(items.into_iter().map(|item| async move {
            send_notification(item).await.map_err(|error| error.to_string())
        }))
        .await
    }
}

let worker = WorkerOptions::default()
    .define_batch_job::<PendingNotification>()
    // ... other configuration
    .init()
    .await?;

worker.create_utils()
    .add_batch_job(
        vec![
            PendingNotification { user_id: "1".into(), message_id: "a".into() },
            PendingNotification { user_id: "1".into(), message_id: "b".into() },
        ],
        JobSpecBuilder::new()
            .job_key("notifications:1")
            .job_key_mode(JobKeyMode::PreserveRunAt)
            .build(),
    )
    .await?;

§Advanced Features

§Shared Application State

You can provide shared state to all your tasks using extensions:

use serde::{Deserialize, Serialize};
use graphile_worker::{WorkerContext, TaskHandler, IntoTaskHandlerResult};
use std::sync::{Arc, atomic::{AtomicUsize, Ordering::SeqCst}};

// Define your shared state
#[derive(Clone, Debug)]
struct AppState {
    db_client: Arc<DatabaseClient>,
    api_key: String,
    counter: Arc<AtomicUsize>,
}

// Example database client (just for demonstration)
struct DatabaseClient;
impl DatabaseClient {
    fn new() -> Self { Self }
    async fn find_user(&self, _user_id: &str) -> Result<(), String> { Ok(()) }
}

#[derive(Deserialize, Serialize)]
struct ProcessUserTask {
    user_id: String,
}

impl TaskHandler for ProcessUserTask {
    const IDENTIFIER: &'static str = "process_user";

    async fn run(self, ctx: WorkerContext) -> impl IntoTaskHandlerResult {
        // Access the shared state in your task
        let app_state = ctx.get_ext::<AppState>().unwrap();
        let count = app_state.counter.fetch_add(1, SeqCst);
        
        // Use shared resources
        app_state.db_client.find_user(&self.user_id).await?;
        
        println!("Processed user {}, task count: {}", self.user_id, count);
        Ok::<(), String>(())
    }
}

// Add the extension when configuring the worker
let app_state = AppState {
    db_client: Arc::new(DatabaseClient::new()),
    api_key: "secret_key".to_string(),
    counter: Arc::new(AtomicUsize::new(0)),
};

graphile_worker::WorkerOptions::default()
    .add_extension(app_state)
    .define_job::<ProcessUserTask>()
    // ... other configuration
    .init()
    .await?;

§Scheduling Options

You can customize how and when jobs run with the JobSpec builder:

use graphile_worker::{JobSpecBuilder, JobKeyMode};
use chrono::Utc;

// Schedule a job to run after 5 minutes with high priority
let job_spec = JobSpecBuilder::new()
    .run_at(Utc::now() + chrono::Duration::minutes(5))
    .priority(10)
    .job_key("welcome_email_user_123")  // Unique identifier for deduplication
    .job_key_mode(JobKeyMode::Replace)  // Replace existing jobs with this key
    .max_attempts(5)                    // Max retry attempts (default is 25)
    .build();

utils.add_job(SendEmail { /* ... */ }, job_spec).await?;

§Job Queues

Jobs with the same queue name run in series (one after another) rather than in parallel:

// These jobs will run one after another, not concurrently
let spec1 = JobSpecBuilder::new()
    .queue_name("user_123_operations")
    .build();

let spec2 = JobSpecBuilder::new()
    .queue_name("user_123_operations")
    .build();

utils.add_job(UpdateProfile { /* ... */ }, spec1).await?;
utils.add_job(SendEmail { /* ... */ }, spec2).await?;

§Cron Jobs

You can schedule recurring jobs with the typed cron API:

use graphile_worker::{Cron, CrontabFill, WorkerOptions};

// Run a task daily at 8:00 AM UTC, with one hour of backfill.
let worker = WorkerOptions::default()
    .define_job::<SendDailyReport>()
    .with_cron(
        Cron::daily_at::<SendDailyReport>(8, 0)?
            .fill(CrontabFill::hours(1)),
    )
    // ... other configuration
    .init()
    .await?;

Crontab text is still supported when you need Graphile Worker’s file-style syntax:

let worker = WorkerOptions::default()
    .define_job::<SendDailyReport>()
    .with_cron("0 8 * * * send_daily_report")?
    // ... other configuration
    .init()
    .await?;

with_crontab(...) remains available as a deprecated compatibility alias.

§Local Queue

The Local Queue feature batch-fetches jobs from the database and caches them locally, significantly reducing database load in high-throughput scenarios.

use graphile_worker::{WorkerOptions, LocalQueueConfig, RefetchDelayConfig};
use std::time::Duration;

let worker = WorkerOptions::default()
    .local_queue(
        LocalQueueConfig::default()
            .with_size(100)                              // Cache up to 100 jobs
            .with_ttl(Duration::from_secs(300))          // Return unclaimed jobs after 5 minutes
            .with_refetch_delay(
                RefetchDelayConfig::default()
                    .with_duration(Duration::from_millis(100))  // Delay between refetches
                    .with_threshold(10)                         // Refetch when queue drops below 10
            )
    )
    .define_job::<SendEmail>()
    .pg_pool(pg_pool)
    .init()
    .await?;

The Local Queue operates in several modes:

  • Polling: Actively fetching jobs from the database
  • Waiting: Jobs are cached locally, serving from cache
  • TtlExpired: Cache TTL expired, returning jobs to database

Key benefits:

  • Reduces database round-trips by fetching jobs in batches
  • Configurable cache size and TTL
  • Automatic return of unclaimed jobs on shutdown or TTL expiry
  • Refetch delay prevents thundering herd on empty queues

§Lifecycle Hooks

You can observe and intercept job lifecycle events using plugins that implement the Plugin trait. This is useful for logging, metrics, validation, and custom job handling logic.

use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use graphile_worker::{
    Plugin, HookRegistry, HookResult,
    WorkerStart, JobStart, JobComplete, JobFail, BeforeJobRun,
};

struct MetricsPlugin {
    jobs_started: AtomicU64,
    jobs_completed: AtomicU64,
}

impl Plugin for MetricsPlugin {
    fn register(self, hooks: &mut HookRegistry) {
        hooks.on(WorkerStart, async |ctx| {
            println!("Worker {} started", ctx.worker_id);
        });

        let jobs_started = Arc::new(self.jobs_started);
        let jobs_completed = Arc::new(self.jobs_completed);

        {
            let jobs_started = jobs_started.clone();
            hooks.on(JobStart, move |ctx| {
                let jobs_started = jobs_started.clone();
                async move {
                    jobs_started.fetch_add(1, Ordering::Relaxed);
                    println!("Job {} started", ctx.job.id());
                }
            });
        }

        {
            let jobs_completed = jobs_completed.clone();
            hooks.on(JobComplete, move |ctx| {
                let jobs_completed = jobs_completed.clone();
                async move {
                    jobs_completed.fetch_add(1, Ordering::Relaxed);
                    println!("Job {} completed in {:?}", ctx.job.id(), ctx.duration);
                }
            });
        }

        hooks.on(JobFail, async |ctx| {
            println!("Job {} failed: {}", ctx.job.id(), ctx.error);
        });
    }
}
§Intercepting Jobs

The BeforeJobRun and AfterJobRun hooks can intercept jobs and change their behavior:

struct ValidationPlugin;

impl Plugin for ValidationPlugin {
    fn register(self, hooks: &mut HookRegistry) {
        hooks.on(BeforeJobRun, async |ctx| {
            // Skip jobs with a "skip" flag in their payload
            if ctx.payload.get("skip").and_then(|v| v.as_bool()).unwrap_or(false) {
                return HookResult::Skip;
            }

            // Fail jobs with invalid data
            if ctx.payload.get("invalid").is_some() {
                return HookResult::Fail("Invalid payload".into());
            }

            // Continue with normal execution
            HookResult::Continue
        });
    }
}
§Registering Plugins

Add plugins when configuring the worker:

let worker = WorkerOptions::default()
    .define_job::<SendEmail>()
    .add_plugin(MetricsPlugin::new())
    .add_plugin(ValidationPlugin)
    .pg_pool(pg_pool)
    .init()
    .await?;

Multiple plugins can be registered and they will all receive hook calls in the order they were added.

§Available Hooks
HookTypeDescription
WorkerInitObserverCalled when worker is initializing
WorkerStartObserverCalled when worker starts processing
WorkerShutdownObserverCalled when worker is shutting down
JobFetchObserverCalled when a job is fetched from the queue
JobStartObserverCalled before a job starts executing
JobCompleteObserverCalled after a job completes successfully
JobFailObserverCalled when a job fails (will retry)
JobPermanentlyFailObserverCalled when a job exceeds max attempts
CronTickObserverCalled on each cron scheduler tick
CronJobScheduledObserverCalled when a cron job is scheduled
LocalQueueInitObserverCalled when local queue is initialized
LocalQueueSetModeObserverCalled when local queue changes mode
LocalQueueGetJobsCompleteObserverCalled after batch fetching jobs
LocalQueueReturnJobsObserverCalled when jobs are returned to database
LocalQueueRefetchDelayStartObserverCalled when refetch delay starts
LocalQueueRefetchDelayAbortObserverCalled when refetch delay is aborted
LocalQueueRefetchDelayExpiredObserverCalled when refetch delay expires
BeforeJobRunInterceptorCan skip, fail, or continue job execution
AfterJobRunInterceptorCan modify the job result after execution
BeforeJobScheduleInterceptorCan skip, fail, or transform job before scheduling

§Job Management Utilities

The WorkerUtils class provides methods for managing jobs:

// Get a WorkerUtils instance
let utils = worker.create_utils();

// Remove a job by its key
utils.remove_job("job_key_123").await?;

// Mark jobs as completed
utils.complete_jobs(&[job_id1, job_id2]).await?;

// Permanently fail jobs with a reason
utils.permanently_fail_jobs(&[job_id3, job_id4], "Invalid data").await?;

// Reschedule jobs
let options = RescheduleJobOptions {
    run_at: Some(Utc::now() + chrono::Duration::minutes(60)),
    priority: Some(5),
    max_attempts: Some(3),
    ..Default::default()
};
utils.reschedule_jobs(&[job_id5, job_id6], options).await?;

// Run database cleanup tasks
utils.cleanup(&[
    CleanupTask::DeletePermenantlyFailedJobs,
    CleanupTask::GcTaskIdentifiers,
    CleanupTask::GcJobQueues,
]).await?;

§CLI

The workspace includes a dedicated graphile_worker_cli crate with a graphile-worker binary for managing jobs from a terminal.

graphile-worker --database-url postgres://postgres:postgres@localhost/postgres migrate
DATABASE_URL=postgres://postgres:postgres@localhost/postgres graphile-worker add send_email --payload '{"to":"user@example.com"}'
graphile-worker list --state ready
graphile-worker complete 123 124
graphile-worker fail 125 --reason "invalid payload"
graphile-worker reschedule 126 --run-at 2026-01-02T03:04:05Z
graphile-worker remove cli-job-key
graphile-worker cleanup
graphile-worker force-unlock graphile_worker_deadbeef

Use --schema or GRAPHILE_WORKER_SCHEMA for non-default schemas, and --json when machine-readable output is preferred.

The CLI can also serve the embedded Leptos admin UI. It serves its HTML, API, Leptos/WASM client bundle, Tailwind CSS, and icon assets from the same binary. HTTP Basic auth is the default; when no password is configured, the CLI prints a random password at startup.

The crate ships prebuilt admin UI assets, so cargo install graphile_worker_cli does not require npm, wasm-bindgen, or the wasm32-unknown-unknown Rust target. Maintainers can force a source rebuild with GRAPHILE_WORKER_ADMIN_UI_REBUILD=1; that path needs those tools. Use GRAPHILE_WORKER_ADMIN_UI_UPDATE_PREBUILT=1 when intentionally refreshing the checked-in prebuilt assets.

DATABASE_URL=postgres://postgres:postgres@localhost/postgres graphile-worker admin
graphile-worker admin --listen 127.0.0.1:8080 --read-only
graphile-worker admin --auth bearer
graphile-worker admin --auth header --header-name x-admin-token

The admin UI supports dashboard stats, job list filtering, pasted multi-value column filters, row selection, copy JSON/CSV actions, job add/complete/fail/run now/reschedule/remove-by-key actions, queue and worker views, force unlock, cleanup, migrations, theming, and dark mode.

§Feature List

  • Flexible deployment: Run standalone or embedded in your application
  • Multi-language support: Use from Rust, SQL or alongside the Node.js version
  • Performance optimized:
    • Low latency job execution (typically under 3ms)
    • PostgreSQL LISTEN/NOTIFY for immediate job notifications
    • SKIP LOCKED for efficient job fetching
  • Robust job processing:
    • Parallel processing with customizable concurrency
    • Serialized execution via named queues
    • Automatic retries with exponential backoff
    • Customizable retry counts (default: 25 attempts over ~3 days)
  • Scheduling features:
    • Delayed execution with run_at
    • Job prioritization
    • Crontab-like recurring tasks
    • Task deduplication via job_key
    • Batch job processing with partial retry support
  • Lifecycle hooks: Observe and intercept job events for logging, metrics, and validation
  • Type safety: End-to-end type checking of job payloads
  • Minimal overhead: Direct serialization of task payloads

§Requirements

  • PostgreSQL 12+
    • Required for the generated always as (expression) feature
    • May work with older versions but has not been tested

§Project Status

Production ready but the API may continue to evolve. If you encounter any issues or have feature requests, please open an issue on GitHub.

§Acknowledgments

This library is a Rust port of the excellent Graphile Worker by Benjie Gillam. If you find this library useful, please consider sponsoring Benjie’s work, as all the research and architecture design was done by him.

§License

MIT License - See LICENSE.md

§Graphile Worker RS

A PostgreSQL-backed job queue implementation for Rust applications. This crate is a Rust port of the Node.js Graphile Worker library.

§Architecture Overview

Graphile Worker uses PostgreSQL as its backend for job storage and coordination. The system consists of several key components:

  • Worker: Processes jobs from the queue using the specified concurrency.
  • WorkerUtils: Utility functions for job management (adding, removing, rescheduling, etc.).
  • TaskHandler: Trait that defines how specific job types are processed.
  • Job Specification: Configures job parameters like priority, retry behavior, and scheduling.
  • Migrations: Automatic schema management for the database tables.

§Database Schema

Graphile Worker manages its own database schema (default: graphile_worker). It automatically handles migrations and uses the following tables:

  • _private_jobs: Stores job data, state, and execution metadata
  • _private_tasks: Tracks registered task types
  • _private_job_queues: Manages job queue names for serialized job execution
  • _private_workers: Tracks active worker instances

§Module Structure

The crate is organized into the following modules:

Re-exports§

pub use cron::Cron;
pub use cron::CronBuilder;
pub use builder::CronInput;
pub use builder::WorkerBuildError;
pub use builder::WorkerOptions;
pub use context_ext::WorkerContextExt;
pub use local_queue::LocalQueue;
pub use local_queue::LocalQueueConfig;
pub use local_queue::LocalQueueError;
pub use local_queue::RefetchDelayConfig;
pub use runner::Worker;
pub use sql::add_job::RawJobSpec;
pub use worker_utils::WorkerUtils;
pub use crate::job_spec::*;

Modules§

builder
Configuration and initialization of worker instances
context_ext
cron
errors
Error types used throughout the crate
job_spec
Job specification and builder for configuring jobs
local_queue
LocalQueue for batch-fetching jobs to improve throughput
row_mapping
runner
Core worker implementation for running the job queue
sql
SQL query implementations for interacting with the database
sqlx
streams
Job stream management for processing jobs
utils
General utility functions
worker_utils
Utility functions for job management

Structs§

AfterJobRun
AfterJobRunContext
BeforeJobRun
BeforeJobRunContext
BeforeJobSchedule
BeforeJobScheduleContext
CronJobScheduled
CronJobScheduledContext
CronTick
CronTickContext
Crontab
A crontab defines a task to be executed at a specific time(s)
CrontabFill
A crontab fill represents how long a crontab should be backfilled For instance the server is down for 1 hour, the task should be backfilled for 1 hour
CrontabTimer
A crontab timer is a set of crontab values for each field (minutes, hours, days, months, days of week)
Database
DbError
DbJob
DbJob represents a job as stored in the database.
DbParams
DbRow
DbTransaction
HookRegistry
Job
Job extends DbJob with an additional task_identifier field.
JobBuilder
Builder for Job.
JobComplete
JobCompleteContext
JobDefinition
Reusable registration value for a TaskHandler.
JobFail
JobFailContext
JobFetch
JobFetchContext
JobPermanentlyFail
JobPermanentlyFailContext
JobStart
JobStartContext
LocalQueueGetJobsComplete
LocalQueueGetJobsCompleteContext
LocalQueueInit
LocalQueueInitContext
LocalQueueRefetchDelayAbort
LocalQueueRefetchDelayAbortContext
LocalQueueRefetchDelayExpired
LocalQueueRefetchDelayExpiredContext
LocalQueueRefetchDelayStart
LocalQueueRefetchDelayStartContext
LocalQueueReturnJobs
LocalQueueReturnJobsContext
LocalQueueSetMode
LocalQueueSetModeContext
Notification
SharedTaskDetails
TaskDetails
WorkerContext
Context provided to task handlers when processing a job.
WorkerContextBuilder
WorkerInit
WorkerInitContext
WorkerShutdown
WorkerShutdownContext
WorkerStart
WorkerStartContext

Enums§

BatchTaskResult
Result returned by a batch task handler.
CronJobKeyMode
Behavior when an existing job with the same job key is found is controlled by this setting
CrontabField
Crontab time fields used for validating typed schedule construction.
CrontabTimerError
Error returned when typed schedule constructors receive invalid values.
CrontabValue
A crontab value can be a number, a range, a step or any value It is used to represent a crontab value for a specific field (hour, day, month, etc.) When specifying a specific number, range or step, it should be valid for the field (e.g. 0-59 for minutes, 0-23 for hours, etc.)
DbCell
DbValue
HookResult
JobBuilderError
Error type for JobBuilder
JobScheduleResult
LocalQueueMode
ShutdownReason
TaskHandlerOutcome
Outcome returned by type-erased task handlers.

Traits§

BatchTaskHandler
Core trait for defining batch task handlers.
DatabaseDriver
DbExecutor
DbExecutorArg
Event
FromDbCell
HookOutput
Interceptable
IntoBatchTaskHandlerResult
Trait for converting batch task return types into a standardized result.
IntoTaskHandlerResult
Trait for converting task handler return types into a standardized Result.
Plugin
TaskHandler
Core trait for defining task handlers in Graphile Worker.
TransactionDriver

Functions§

parse_crontab
Parse a crontab definition into a Vec of crontab
run_task_from_worker_ctx
Internal function to execute a task handler from a worker context.

Type Aliases§

BoxFuture
NotificationStream
TaskHandlerFn
Type-erased function used by workers to run a task from a WorkerContext.