graphile_worker 0.13.2

High performance Rust/PostgreSQL job queue (also suitable for getting jobs generated by PostgreSQL triggers/functions out into a different work queue)
Documentation
use graphile_worker_database::{Database, DbExecutor, DbValue, Schema};
use indoc::formatdoc;
use tracing::error;

use super::super::CompletionRequest;

pub(super) async fn complete_jobs_with_queues(
    database: &Database,
    schema: &Schema,
    worker_id: &str,
    job_ids: Vec<i64>,
) -> bool {
    if job_ids.is_empty() {
        return false;
    }

    let jobs = schema.private_table("jobs");
    let job_queues = schema.private_table("job_queues");
    let sql = formatdoc!(
        r#"
            WITH j AS (
                DELETE FROM {jobs}
                WHERE id = ANY($1::bigint[])
                RETURNING *
            )
            UPDATE {job_queues} AS job_queues
            SET locked_by = NULL, locked_at = NULL
            FROM j
            WHERE job_queues.id = j.job_queue_id AND job_queues.locked_by = $2::text
        "#
    );

    match database
        .execute(
            &sql,
            vec![
                DbValue::I64Array(job_ids),
                DbValue::Text(worker_id.to_string()),
            ]
            .into(),
        )
        .await
    {
        Ok(_) => true,
        Err(error) => {
            error!(?error, "Failed to complete jobs with queue");
            false
        }
    }
}

pub(super) async fn complete_one_job_with_queue(
    req: &CompletionRequest,
    database: &Database,
    schema: &Schema,
    worker_id: &str,
) -> bool {
    let jobs = schema.private_table("jobs");
    let job_queues = schema.private_table("job_queues");
    let sql = formatdoc!(
        r#"
            WITH j AS (
                DELETE FROM {jobs}
                WHERE id = $1
                RETURNING *
            )
            UPDATE {job_queues} AS job_queues
            SET locked_by = NULL, locked_at = NULL
            FROM j
            WHERE job_queues.id = j.job_queue_id AND job_queues.locked_by = $2::text
        "#
    );

    if let Err(error) = database
        .execute(
            &sql,
            vec![
                DbValue::I64(req.job_id),
                DbValue::Text(worker_id.to_string()),
            ]
            .into(),
        )
        .await
    {
        error!(
            ?error,
            job_id = req.job_id,
            "Failed to complete job directly (with queue)"
        );
        return false;
    }

    true
}