graphile_worker 0.13.3

High performance Rust/PostgreSQL job queue (also suitable for getting jobs generated by PostgreSQL triggers/functions out into a different work queue)
Documentation
# First Worker

This page walks through the smallest useful Graphile Worker RS flow:

1. Create a PostgreSQL pool.
2. Define a task handler.
3. Configure `WorkerOptions`.
4. Initialize the worker.
5. Add a job.
6. Run the worker.
7. Let shutdown happen gracefully.

The examples use Tokio and SQLx, matching the default setup used by the
repository examples.

## Define a Handler

A worker processes jobs by matching each job's task identifier to a registered
`TaskHandler`. The handler type is also the payload type, so it should derive
`Serialize` and `Deserialize`.

```rust,ignore
use graphile_worker::{IntoTaskHandlerResult, TaskHandler, WorkerContext};
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
struct SayHello {
    message: String,
}

impl TaskHandler for SayHello {
    const IDENTIFIER: &'static str = "say_hello";

    async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
        println!("Hello {} !", self.message);
        Ok::<(), String>(())
    }
}
```

`IDENTIFIER` is the task name stored in PostgreSQL. Register the same handler
type on any worker that should be able to process jobs with that identifier.

## Create the Pool

You can pass an existing SQLx PostgreSQL pool with `pg_pool`. This is useful
when your application already owns database configuration.

```rust,ignore
use std::str::FromStr;

use sqlx::postgres::{PgConnectOptions, PgPoolOptions};

let pg_options = PgConnectOptions::from_str(
    "postgres://postgres:root@localhost:5432",
)?;

let pg_pool = PgPoolOptions::new()
    .max_connections(5)
    .connect_with(pg_options)
    .await?;
```

You can also let Graphile Worker create the pool from a URL with
`database_url`. When using `database_url`, `max_pg_conn` controls the pool size
and defaults to 20 if it is not set.

```rust,ignore
let options = graphile_worker::WorkerOptions::default()
    .database_url("postgres://postgres:root@localhost:5432")
    .max_pg_conn(5);
```

If both an existing database connection and `database_url` are provided, the
existing connection takes precedence.

## Configure and Initialize

`WorkerOptions` is a builder. The usual first-worker options are:

- `concurrency`: maximum jobs processed at the same time. If omitted, it
  defaults to the number of logical CPUs.
- `schema`: PostgreSQL schema used for Graphile Worker tables. If omitted, it
  defaults to `graphile_worker`.
- `define_job::<T>()`: registers a `TaskHandler` type.
- `pg_pool`, `database`, or `database_url`: provides the database connection.

Calling `init().await` connects to the database if needed, runs migrations for
the selected schema, registers task details, creates the worker id, and returns
a ready `Worker`.

```rust,ignore
use graphile_worker::WorkerOptions;

let worker = WorkerOptions::default()
    .concurrency(2)
    .schema("example_simple_worker")
    .define_job::<SayHello>()
    .pg_pool(pg_pool)
    .init()
    .await?;
```

## Add a Job

After initialization, create utilities from the worker and enqueue a job.
`JobSpec::default()` schedules it with default job options.

```rust,ignore
use graphile_worker::JobSpec;

let helpers = worker.create_utils();

helpers
    .add_job(
        SayHello {
            message: "world".to_string(),
        },
        JobSpec::default(),
    )
    .await?;
```

To schedule a job for later, build a `JobSpec` with `JobSpecBuilder`.

```rust,ignore
use chrono::{Duration, Utc};
use graphile_worker::JobSpecBuilder;

helpers
    .add_job(
        SayHello {
            message: "world".to_string(),
        },
        JobSpecBuilder::new()
            .run_at(Utc::now() + Duration::seconds(10))
            .build(),
    )
    .await?;
```

## Run the Worker

`run()` starts the long-running worker loop. It registers the worker, starts job
sources, processes jobs until shutdown is requested, waits for batchers to
finish, emits shutdown hooks, and deregisters the worker.

```rust,ignore
worker.run().await?;
```

For scripts and smoke tests, `run_once()` processes currently available jobs and
then returns.

```rust,ignore
worker.run_once().await?;
```

## Full Example

```rust,ignore
use std::str::FromStr;

use graphile_worker::{
    IntoTaskHandlerResult, JobSpec, TaskHandler, WorkerContext, WorkerOptions,
};
use serde::{Deserialize, Serialize};
use sqlx::postgres::{PgConnectOptions, PgPoolOptions};

#[derive(Deserialize, Serialize)]
struct SayHello {
    message: String,
}

impl TaskHandler for SayHello {
    const IDENTIFIER: &'static str = "say_hello";

    async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
        println!("Hello {} !", self.message);
        Ok::<(), String>(())
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let pg_options = PgConnectOptions::from_str(
        "postgres://postgres:root@localhost:5432",
    )?;

    let pg_pool = PgPoolOptions::new()
        .max_connections(5)
        .connect_with(pg_options)
        .await?;

    let worker = WorkerOptions::default()
        .concurrency(2)
        .schema("example_simple_worker")
        .define_job::<SayHello>()
        .pg_pool(pg_pool)
        .init()
        .await?;

    let helpers = worker.create_utils();

    helpers
        .add_job(
            SayHello {
                message: "world".to_string(),
            },
            JobSpec::default(),
        )
        .await?;

    worker.run().await?;

    Ok(())
}
```

## Shutdown

By default, the worker listens for OS shutdown signals such as Ctrl+C and
SIGTERM. When a shutdown signal arrives, in-flight jobs are given a grace
period to finish. The default grace period is 5 seconds, and shutdown-aborted
jobs are retried after a default delay of 30 seconds.

If the host application already owns shutdown handling, pass a custom shutdown
future and disable OS signal listeners:

```rust,ignore
use graphile_worker::WorkerShutdownConfig;
use std::time::Duration;

let shutdown = WorkerShutdownConfig::default()
    .listen_os_shutdown_signals(false)
    .grace_period(Duration::from_secs(10))
    .interrupted_job_retry_delay(Duration::from_secs(30))
    .shutdown_signal(async {
        // Complete this future when your application wants the worker to stop.
    });

let worker = WorkerOptions::default()
    .worker_shutdown(shutdown)
    .define_job::<SayHello>()
    .pg_pool(pg_pool)
    .init()
    .await?;

worker.run().await?;
```

The same settings can also be configured directly on `WorkerOptions` with
`listen_os_shutdown_signals`, `shutdown_signal`, `shutdown_grace_period`, and
`shutdown_interrupted_job_retry_delay`.

Next, see [Scheduling Jobs](../guides/scheduling-jobs.md) for job timing options and
[Worker Options](../configuration/worker-options.md) for more configuration
details.