graphile_worker 0.13.3

High performance Rust/PostgreSQL job queue (also suitable for getting jobs generated by PostgreSQL triggers/functions out into a different work queue)
Documentation
use std::sync::Arc;
use std::time::{Duration, Instant};

use graphile_worker_job::Job;
use graphile_worker_lifecycle_hooks::{
    AfterJobRunContext, BeforeJobRunContext, HookResult, JobStartContext,
};
use tracing::debug;

use crate::runner::errors::RunJobError;
use crate::runner::task_execution::run_job;
use crate::runner::WorkerRunner;
use crate::streams::job_signal::JobSignalSource;

pub(super) async fn run_job_with_hooks(
    job: Arc<Job>,
    worker: &WorkerRunner,
    source: &JobSignalSource,
) -> (Result<(), RunJobError>, Duration) {
    if worker.hooks.is_empty() {
        return run_without_hooks(job, worker, source).await;
    }

    match before_job_run(job.clone(), worker).await {
        HookResult::Continue => run_with_after_hook(job, worker, source).await,
        HookResult::Skip => {
            debug!(job_id = job.id(), "Job skipped by before_job_run hook");
            (Ok(()), Duration::ZERO)
        }
        HookResult::Fail(msg) => {
            debug!(
                job_id = job.id(),
                "Job failed by before_job_run hook: {}", msg
            );
            (Err(RunJobError::TaskError(msg)), Duration::ZERO)
        }
    }
}

async fn run_without_hooks(
    job: Arc<Job>,
    worker: &WorkerRunner,
    source: &JobSignalSource,
) -> (Result<(), RunJobError>, Duration) {
    let start = Instant::now();
    let job_result = run_job(job, worker, source).await;
    (job_result, start.elapsed())
}

async fn before_job_run(job: Arc<Job>, worker: &WorkerRunner) -> HookResult {
    worker
        .hooks
        .intercept(BeforeJobRunContext {
            job: job.clone(),
            worker_id: worker.worker_id.clone(),
            payload: job.payload().clone(),
        })
        .await
}

async fn run_with_after_hook(
    job: Arc<Job>,
    worker: &WorkerRunner,
    source: &JobSignalSource,
) -> (Result<(), RunJobError>, Duration) {
    worker
        .hooks
        .emit(JobStartContext {
            job: job.clone(),
            worker_id: worker.worker_id.clone(),
        })
        .await;

    let start = Instant::now();
    let job_result = run_job(job.clone(), worker, source).await;
    let duration = start.elapsed();

    match after_job_run(job, worker, &job_result, duration).await {
        HookResult::Continue => (job_result, duration),
        HookResult::Skip => (Ok(()), duration),
        HookResult::Fail(msg) => (Err(RunJobError::TaskError(msg)), duration),
    }
}

async fn after_job_run(
    job: Arc<Job>,
    worker: &WorkerRunner,
    job_result: &Result<(), RunJobError>,
    duration: Duration,
) -> HookResult {
    let result_for_hook = job_result
        .as_ref()
        .map(|_| ())
        .map_err(|e| format!("{e:?}"));

    worker
        .hooks
        .intercept(AfterJobRunContext {
            job,
            worker_id: worker.worker_id.clone(),
            result: result_for_hook,
            duration,
        })
        .await
}