graphile_worker 0.13.0

High performance Rust/PostgreSQL job queue (also suitable for getting jobs generated by PostgreSQL triggers/functions out into a different work queue)
Documentation
#[cfg(feature = "driver-sqlx")]
use graphile_worker_database::DbError;
use graphile_worker_database::{DbExecutor, DbValue};
use indoc::formatdoc;

use crate::errors::Result;

use crate::Job;

pub struct FailedJob<'a> {
    pub job: &'a Job,
    pub error: &'a str,
}

#[cfg(feature = "driver-sqlx")]
async fn fail_jobs_sqlx(
    pool: &sqlx::PgPool,
    jobs: &[FailedJob<'_>],
    escaped_schema: &str,
    worker_id: &str,
) -> Result<()> {
    let job_ids: Vec<i64> = jobs.iter().map(|job| *job.job.id()).collect();
    let errors: Vec<&str> = jobs.iter().map(|job| job.error).collect();
    let has_queues = jobs.iter().any(|job| job.job.job_queue_id().is_some());

    if has_queues {
        let sql = formatdoc!(
            r#"
                WITH input AS (
                    SELECT *
                    FROM unnest($1::bigint[], $2::text[]) AS input(id, error)
                ), j AS (
                    UPDATE {escaped_schema}._private_jobs AS jobs
                    SET
                        last_error = input.error,
                        run_at = greatest(now(), jobs.run_at) + (exp(least(jobs.attempts, 10)) * interval '1 second'),
                        locked_by = NULL,
                        locked_at = NULL
                    FROM input
                    WHERE jobs.id = input.id
                        AND jobs.locked_by = $3::text
                    RETURNING jobs.job_queue_id
                )
                UPDATE {escaped_schema}._private_job_queues AS job_queues
                SET locked_by = NULL, locked_at = NULL
                FROM j
                WHERE job_queues.id = j.job_queue_id
                    AND job_queues.locked_by = $3::text;
            "#
        );

        sqlx::query(&sql)
            .bind(&job_ids)
            .bind(&errors)
            .bind(worker_id)
            .execute(pool)
            .await
            .map_err(DbError::from)?;
    } else {
        let sql = formatdoc!(
            r#"
                WITH input AS (
                    SELECT *
                    FROM unnest($1::bigint[], $2::text[]) AS input(id, error)
                )
                UPDATE {escaped_schema}._private_jobs AS jobs
                SET
                    last_error = input.error,
                    run_at = greatest(now(), jobs.run_at) + (exp(least(jobs.attempts, 10)) * interval '1 second'),
                    locked_by = NULL,
                    locked_at = NULL
                FROM input
                WHERE jobs.id = input.id
                    AND jobs.locked_by = $3::text;
            "#
        );

        sqlx::query(&sql)
            .bind(&job_ids)
            .bind(&errors)
            .bind(worker_id)
            .execute(pool)
            .await
            .map_err(DbError::from)?;
    }

    Ok(())
}

#[tracing::instrument(skip_all, err, fields(otel.kind="client", db.system="postgresql"))]
pub async fn fail_job(
    executor: &impl DbExecutor,
    job: &Job,
    escaped_schema: &str,
    worker_id: &str,
    message: &str,
    replacement_payload: Option<serde_json::Value>,
) -> Result<()> {
    if job.job_queue_id().is_some() {
        let sql = formatdoc!(
            r#"
                with j as (
                    update {escaped_schema}._private_jobs as jobs
                        set
                            last_error = $2::text,
                            run_at = greatest(now(), run_at) + (exp(least(attempts, 10)) * interval '1 second'),
                            locked_by = null,
                            locked_at = null,
                            payload = coalesce($4::json, jobs.payload)
                        where id = $1::bigint and locked_by = $3::text
                        returning *
                )
                update {escaped_schema}._private_job_queues as job_queues
                    set locked_by = null, locked_at = null
                    from j
                    where job_queues.id = j.job_queue_id and job_queues.locked_by = $3::text;
            "#
        );

        executor
            .execute(
                &sql,
                vec![
                    DbValue::I64(*job.id()),
                    DbValue::Text(message.to_string()),
                    DbValue::Text(worker_id.to_string()),
                    DbValue::JsonOpt(replacement_payload.clone()),
                ]
                .into(),
            )
            .await?;
    } else {
        let sql = format!(
            r#"
                update {escaped_schema}._private_jobs as jobs
                    set
                        last_error = $2::text,
                        run_at = greatest(now(), run_at) + (exp(least(attempts, 10)) * interval '1 second'),
                        locked_by = null,
                        locked_at = null,
                        payload = coalesce($4::json, jobs.payload)
                    where id = $1::bigint and locked_by = $3::text;
            "#
        );

        executor
            .execute(
                &sql,
                vec![
                    DbValue::I64(*job.id()),
                    DbValue::Text(message.to_string()),
                    DbValue::Text(worker_id.to_string()),
                    DbValue::JsonOpt(replacement_payload),
                ]
                .into(),
            )
            .await?;
    }

    Ok(())
}

#[tracing::instrument(skip_all, err, fields(otel.kind="client", db.system="postgresql"))]
pub async fn fail_jobs(
    executor: &impl DbExecutor,
    jobs: &[FailedJob<'_>],
    escaped_schema: &str,
    worker_id: &str,
) -> Result<()> {
    if jobs.is_empty() {
        return Ok(());
    }

    #[cfg(feature = "driver-sqlx")]
    if let Some(pool) = executor.try_sqlx_pool() {
        return fail_jobs_sqlx(pool, jobs, escaped_schema, worker_id).await;
    }

    let job_ids: Vec<i64> = jobs.iter().map(|job| *job.job.id()).collect();
    let errors: Vec<String> = jobs.iter().map(|job| job.error.to_string()).collect();
    let has_queues = jobs.iter().any(|job| job.job.job_queue_id().is_some());

    if has_queues {
        let sql = formatdoc!(
            r#"
                WITH input AS (
                    SELECT *
                    FROM unnest($1::bigint[], $2::text[]) AS input(id, error)
                ), j AS (
                    UPDATE {escaped_schema}._private_jobs AS jobs
                    SET
                        last_error = input.error,
                        run_at = greatest(now(), jobs.run_at) + (exp(least(jobs.attempts, 10)) * interval '1 second'),
                        locked_by = NULL,
                        locked_at = NULL
                    FROM input
                    WHERE jobs.id = input.id
                        AND jobs.locked_by = $3::text
                    RETURNING jobs.job_queue_id
                )
                UPDATE {escaped_schema}._private_job_queues AS job_queues
                SET locked_by = NULL, locked_at = NULL
                FROM j
                WHERE job_queues.id = j.job_queue_id
                    AND job_queues.locked_by = $3::text;
            "#
        );

        executor
            .execute(
                &sql,
                vec![
                    DbValue::I64Array(job_ids),
                    DbValue::TextArray(errors),
                    DbValue::Text(worker_id.to_string()),
                ]
                .into(),
            )
            .await?;
    } else {
        let sql = formatdoc!(
            r#"
                WITH input AS (
                    SELECT *
                    FROM unnest($1::bigint[], $2::text[]) AS input(id, error)
                )
                UPDATE {escaped_schema}._private_jobs AS jobs
                SET
                    last_error = input.error,
                    run_at = greatest(now(), jobs.run_at) + (exp(least(jobs.attempts, 10)) * interval '1 second'),
                    locked_by = NULL,
                    locked_at = NULL
                FROM input
                WHERE jobs.id = input.id
                    AND jobs.locked_by = $3::text;
            "#
        );

        executor
            .execute(
                &sql,
                vec![
                    DbValue::I64Array(job_ids),
                    DbValue::TextArray(errors),
                    DbValue::Text(worker_id.to_string()),
                ]
                .into(),
            )
            .await?;
    }

    Ok(())
}