graphile_worker 0.13.3

High performance Rust/PostgreSQL job queue (also suitable for getting jobs generated by PostgreSQL triggers/functions out into a different work queue)
Documentation
use super::*;

#[tokio::test]
async fn test_both_batchers_together() {
    with_test_db(|test_db| async move {
        let utils = test_db.worker_utils();
        utils.migrate().await.expect("Failed to migrate");

        let success_counter = CompletedCounter::new();
        let fail_counter = CompletedCounter::new();

        #[derive(Clone, Debug)]
        struct SuccessCounter(CompletedCounter);

        #[derive(Clone, Debug)]
        struct FailCounter(CompletedCounter);

        #[derive(Serialize, Deserialize)]
        struct MixedSuccessJob {
            id: u32,
        }

        impl TaskHandler for MixedSuccessJob {
            const IDENTIFIER: &'static str = "mixed_success_job";

            async fn run(self, ctx: WorkerContext) -> impl IntoTaskHandlerResult {
                if let Some(counter) = ctx.get_ext::<SuccessCounter>() {
                    counter.0.increment();
                }
                Ok::<(), String>(())
            }
        }

        #[derive(Serialize, Deserialize)]
        struct MixedFailJob {
            id: u32,
        }

        impl TaskHandler for MixedFailJob {
            const IDENTIFIER: &'static str = "mixed_fail_job";

            async fn run(self, ctx: WorkerContext) -> impl IntoTaskHandlerResult {
                if let Some(counter) = ctx.get_ext::<FailCounter>() {
                    counter.0.increment();
                }
                Err::<(), String>(format!("Job {} failed", self.id))
            }
        }

        let worker = Arc::new(
            Worker::options()
                .database(test_db.database.clone())
                .concurrency(8)
                .poll_interval(Duration::from_millis(50))
                .complete_job_batch_delay(Duration::from_millis(10))
                .fail_job_batch_delay(Duration::from_millis(10))
                .add_extension(SuccessCounter(success_counter.clone()))
                .add_extension(FailCounter(fail_counter.clone()))
                .define_job::<MixedSuccessJob>()
                .define_job::<MixedFailJob>()
                .init()
                .await
                .expect("Failed to create worker"),
        );

        let worker_clone = worker.clone();
        let worker_handle = spawn_local(async move {
            worker_clone.run().await.expect("Failed to run worker");
        });

        for i in 0..5 {
            utils
                .add_job(MixedSuccessJob { id: i }, JobSpec::default())
                .await
                .expect("Failed to add job");
            utils
                .add_job(
                    MixedFailJob { id: i },
                    JobSpec::builder().max_attempts(1).build(),
                )
                .await
                .expect("Failed to add job");
        }

        let start = Instant::now();
        while success_counter.get() < 5 || fail_counter.get() < 5 {
            if start.elapsed() > Duration::from_secs(5) {
                panic!(
                    "Jobs should have completed by now, success: {}, fail: {}",
                    success_counter.get(),
                    fail_counter.get()
                );
            }
            sleep(Duration::from_millis(50)).await;
        }

        assert_eq!(success_counter.get(), 5);
        assert_eq!(fail_counter.get(), 5);

        worker.request_shutdown();
        let _ = worker_handle.await;
    })
    .await;
}