graphile_worker 0.13.3

High performance Rust/PostgreSQL job queue (also suitable for getting jobs generated by PostgreSQL triggers/functions out into a different work queue)
Documentation
use graphile_worker::{
    HookRegistry, IntoTaskHandlerResult, JobComplete, JobFail, JobPermanentlyFail, JobSpec, Plugin,
    TaskHandler, Worker, WorkerContext,
};
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::task::spawn_local;
use tokio::time::{sleep, Instant};

use crate::helpers::with_test_db;

mod helpers;

#[derive(Clone, Debug)]
struct CompletedCounter(Arc<AtomicU32>);

impl CompletedCounter {
    fn new() -> Self {
        Self(Arc::new(AtomicU32::new(0)))
    }

    fn increment(&self) {
        self.0.fetch_add(1, Ordering::SeqCst);
    }

    fn get(&self) -> u32 {
        self.0.load(Ordering::SeqCst)
    }
}

#[derive(Debug, Default)]
struct BatcherHookCounters {
    complete: AtomicU32,
    fail: AtomicU32,
    permanent: AtomicU32,
}

#[derive(Clone, Debug)]
struct BatcherHooksPlugin {
    counters: Arc<BatcherHookCounters>,
}

impl BatcherHooksPlugin {
    fn new() -> Self {
        Self {
            counters: Arc::new(BatcherHookCounters::default()),
        }
    }

    fn counters(&self) -> Arc<BatcherHookCounters> {
        self.counters.clone()
    }
}

impl Plugin for BatcherHooksPlugin {
    fn register(self, hooks: &mut HookRegistry) {
        let counters = self.counters.clone();
        hooks.on(JobComplete, move |_ctx| {
            let counters = counters.clone();
            async move {
                counters.complete.fetch_add(1, Ordering::SeqCst);
            }
        });

        let counters = self.counters.clone();
        hooks.on(JobFail, move |_ctx| {
            let counters = counters.clone();
            async move {
                counters.fail.fetch_add(1, Ordering::SeqCst);
            }
        });

        let counters = self.counters.clone();
        hooks.on(JobPermanentlyFail, move |_ctx| {
            let counters = counters.clone();
            async move {
                counters.permanent.fetch_add(1, Ordering::SeqCst);
            }
        });
    }
}

#[derive(Serialize, Deserialize)]
struct SuccessJob {
    id: u32,
}

impl TaskHandler for SuccessJob {
    const IDENTIFIER: &'static str = "success_job";

    async fn run(self, ctx: WorkerContext) -> impl IntoTaskHandlerResult {
        if let Some(counter) = ctx.get_ext::<CompletedCounter>() {
            counter.increment();
        }
        Ok::<(), String>(())
    }
}

#[derive(Serialize, Deserialize)]
struct FailJob {
    id: u32,
}

impl TaskHandler for FailJob {
    const IDENTIFIER: &'static str = "fail_job";

    async fn run(self, ctx: WorkerContext) -> impl IntoTaskHandlerResult {
        if let Some(counter) = ctx.get_ext::<CompletedCounter>() {
            counter.increment();
        }
        Err::<(), String>(format!("Job {} failed", self.id))
    }
}

#[path = "batcher/completion.rs"]
mod completion;
#[path = "batcher/failure.rs"]
mod failure;
#[path = "batcher/hooks.rs"]
mod hooks;
#[path = "batcher/mixed.rs"]
mod mixed;