graphile_worker 0.13.3

High performance Rust/PostgreSQL job queue (also suitable for getting jobs generated by PostgreSQL triggers/functions out into a different work queue)
Documentation
use std::sync::Arc;

use graphile_worker_job::Job;
use graphile_worker_lifecycle_hooks::{JobFailContext, JobPermanentlyFailContext};
use serde_json::Value;
use tracing::{error, warn};

use crate::batcher::FailureRequest;
use graphile_worker_queries::fail_job::single::fail_job;

use super::super::errors::{ReleaseJobError, RunJobError};
use super::super::recovery_release::recover_shutdown_aborted_job;
use super::super::WorkerRunner;

pub(super) async fn release_failed_job(
    error: RunJobError,
    job: Arc<Job>,
    worker: &WorkerRunner,
) -> Result<(), ReleaseJobError> {
    if matches!(error, RunJobError::TaskAborted) {
        let recovery_delay = worker.shutdown_config.interrupted_job_retry_delay;
        recover_shutdown_aborted_job(worker, job, recovery_delay).await?;
        return Ok(());
    }

    let persisted_error = error.persisted_error();
    let will_retry = job.attempts() < job.max_attempts();
    let replacement_payload = error.replacement_payload().cloned();

    log_failed_job(&error, &job, will_retry);

    if replacement_payload.is_none() {
        if let Some(batcher) = &worker.failure_batcher {
            batcher
                .fail(FailureRequest {
                    job,
                    error: persisted_error,
                    will_retry,
                })
                .await;
            return Ok(());
        }
    }

    persist_failed_job(&job, worker, &persisted_error, replacement_payload).await?;
    emit_failure_hook(job, worker, persisted_error, will_retry).await;
    Ok(())
}

fn log_failed_job(error: &RunJobError, job: &Job, will_retry: bool) {
    if will_retry {
        warn!(
            error = ?error,
            task_id = job.task_id(),
            payload = ?job.payload(),
            job_id = job.id(),
            "Failed task"
        );
    } else {
        error!(
            error = ?error,
            task_id = job.task_id(),
            payload = ?job.payload(),
            job_id = job.id(),
            "Job max attempts reached"
        );
    }
}

async fn persist_failed_job(
    job: &Arc<Job>,
    worker: &WorkerRunner,
    error: &str,
    replacement_payload: Option<Value>,
) -> Result<(), ReleaseJobError> {
    if let Err(source) = fail_job(
        &worker.database,
        job,
        &worker.schema,
        &worker.worker_id,
        error,
        replacement_payload,
    )
    .await
    {
        error!(error = ?source, job_id = job.id(), "Failed to persist failed job");
        return Err(ReleaseJobError {
            job_id: *job.id(),
            source,
        });
    }

    Ok(())
}

async fn emit_failure_hook(job: Arc<Job>, worker: &WorkerRunner, error: String, will_retry: bool) {
    if worker.hooks.is_empty() {
        return;
    }

    if will_retry {
        worker
            .hooks
            .emit(JobFailContext {
                job,
                worker_id: worker.worker_id.clone(),
                error,
                will_retry,
            })
            .await;
    } else {
        worker
            .hooks
            .emit(JobPermanentlyFailContext {
                job,
                worker_id: worker.worker_id.clone(),
                error,
            })
            .await;
    }
}