graphile_worker 0.13.3

High performance Rust/PostgreSQL job queue (also suitable for getting jobs generated by PostgreSQL triggers/functions out into a different work queue)
Documentation
#[derive(Debug, Default)]
struct HookCounters {
    worker_init: AtomicU32,
    worker_start: AtomicU32,
    worker_shutdown: AtomicU32,
    job_fetch: AtomicU32,
    job_start: AtomicU32,
    job_complete: AtomicU32,
    job_fail: AtomicU32,
    before_job_run: AtomicU32,
    skipped: AtomicU32,
    failed_by_hook: AtomicU32,
}

#[derive(Clone)]
struct TestHooksPlugin {
    counters: Arc<HookCounters>,
}

impl TestHooksPlugin {
    fn new() -> Self {
        Self {
            counters: Arc::new(HookCounters::default()),
        }
    }

    fn counters(&self) -> Arc<HookCounters> {
        self.counters.clone()
    }
}

impl Plugin for TestHooksPlugin {
    fn register(self, hooks: &mut HookRegistry) {
        let counters = self.counters.clone();
        hooks.on(WorkerInit, move |_ctx| {
            let counters = counters.clone();
            async move {
                counters.worker_init.fetch_add(1, Ordering::SeqCst);
            }
        });

        let counters = self.counters.clone();
        hooks.on(WorkerStart, move |_ctx| {
            let counters = counters.clone();
            async move {
                counters.worker_start.fetch_add(1, Ordering::SeqCst);
            }
        });

        let counters = self.counters.clone();
        hooks.on(WorkerShutdown, move |_ctx| {
            let counters = counters.clone();
            async move {
                counters.worker_shutdown.fetch_add(1, Ordering::SeqCst);
            }
        });

        let counters = self.counters.clone();
        hooks.on(JobFetch, move |_ctx| {
            let counters = counters.clone();
            async move {
                counters.job_fetch.fetch_add(1, Ordering::SeqCst);
            }
        });

        let counters = self.counters.clone();
        hooks.on(JobStart, move |_ctx| {
            let counters = counters.clone();
            async move {
                counters.job_start.fetch_add(1, Ordering::SeqCst);
            }
        });

        let counters = self.counters.clone();
        hooks.on(JobComplete, move |_ctx| {
            let counters = counters.clone();
            async move {
                counters.job_complete.fetch_add(1, Ordering::SeqCst);
            }
        });

        let counters = self.counters.clone();
        hooks.on(JobFail, move |_ctx| {
            let counters = counters.clone();
            async move {
                counters.job_fail.fetch_add(1, Ordering::SeqCst);
            }
        });

        let counters = self.counters.clone();
        hooks.on(BeforeJobRun, move |ctx| {
            let counters = counters.clone();
            async move {
                counters.before_job_run.fetch_add(1, Ordering::SeqCst);

                let should_skip = ctx
                    .payload
                    .get("skip")
                    .and_then(|v| v.as_bool())
                    .unwrap_or(false);

                let should_fail = ctx
                    .payload
                    .get("force_fail")
                    .and_then(|v| v.as_bool())
                    .unwrap_or(false);

                if should_skip {
                    counters.skipped.fetch_add(1, Ordering::SeqCst);
                    return HookResult::Skip;
                }

                if should_fail {
                    counters.failed_by_hook.fetch_add(1, Ordering::SeqCst);
                    return HookResult::Fail("Forced failure by hook".into());
                }

                HookResult::Continue
            }
        });
    }
}

#[derive(Serialize, Deserialize)]
struct TestJob {
    value: u32,
    #[serde(default)]
    skip: bool,
    #[serde(default)]
    force_fail: bool,
    #[serde(default)]
    should_error: bool,
}

impl TaskHandler for TestJob {
    const IDENTIFIER: &'static str = "test_hooks_job";

    async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
        if self.should_error {
            return Err("Task error".to_string());
        }
        Ok::<(), String>(())
    }
}

async fn wait_for_condition<F>(condition: F, timeout_secs: u64, msg: &str)
where
    F: Fn() -> bool,
{
    let start = Instant::now();
    while !condition() {
        if start.elapsed().as_secs() > timeout_secs {
            panic!("{}", msg);
        }
        sleep(Duration::from_millis(50)).await;
    }
}