graphile_worker 0.13.3

High performance Rust/PostgreSQL job queue (also suitable for getting jobs generated by PostgreSQL triggers/functions out into a different work queue)
Documentation
use super::*;

#[tokio::test]
async fn test_batchers_emit_lifecycle_hooks() {
    with_test_db(|test_db| async move {
        let utils = test_db.worker_utils();
        utils.migrate().await.expect("Failed to migrate");

        let attempts = CompletedCounter::new();
        let plugin = BatcherHooksPlugin::new();
        let hook_counters = plugin.counters();

        let worker = Arc::new(
            Worker::options()
                .database(test_db.database.clone())
                .concurrency(4)
                .poll_interval(Duration::from_millis(50))
                .complete_job_batch_delay(Duration::from_millis(10))
                .fail_job_batch_delay(Duration::from_millis(10))
                .add_extension(attempts.clone())
                .add_plugin(plugin)
                .define_job::<SuccessJob>()
                .define_job::<FailJob>()
                .init()
                .await
                .expect("Failed to create worker"),
        );

        let worker_clone = worker.clone();
        let worker_handle = spawn_local(async move {
            worker_clone.run().await.expect("Failed to run worker");
        });

        utils
            .add_job(SuccessJob { id: 1 }, JobSpec::default())
            .await
            .expect("Failed to add success job");
        utils
            .add_job(
                SuccessJob { id: 2 },
                JobSpec::builder().queue_name("hooked_complete_queue").build(),
            )
            .await
            .expect("Failed to add queued success job");
        utils
            .add_job(FailJob { id: 1 }, JobSpec::builder().max_attempts(3).build())
            .await
            .expect("Failed to add retryable fail job");
        utils
            .add_job(
                FailJob { id: 2 },
                JobSpec::builder()
                    .queue_name("hooked_fail_queue")
                    .max_attempts(1)
                    .build(),
            )
            .await
            .expect("Failed to add permanent fail job");

        let start = Instant::now();
        while hook_counters.complete.load(Ordering::SeqCst) < 2
            || hook_counters.fail.load(Ordering::SeqCst) < 1
            || hook_counters.permanent.load(Ordering::SeqCst) < 1
        {
            if start.elapsed() > Duration::from_secs(5) {
                panic!(
                    "Batch hooks should have fired by now, complete: {}, fail: {}, permanent: {}, attempts: {}",
                    hook_counters.complete.load(Ordering::SeqCst),
                    hook_counters.fail.load(Ordering::SeqCst),
                    hook_counters.permanent.load(Ordering::SeqCst),
                    attempts.get()
                );
            }
            sleep(Duration::from_millis(50)).await;
        }

        assert_eq!(hook_counters.complete.load(Ordering::SeqCst), 2);
        assert_eq!(hook_counters.fail.load(Ordering::SeqCst), 1);
        assert_eq!(hook_counters.permanent.load(Ordering::SeqCst), 1);

        worker.request_shutdown();
        let _ = worker_handle.await;
    })
    .await;
}