graphile_worker 0.13.3

High performance Rust/PostgreSQL job queue (also suitable for getting jobs generated by PostgreSQL triggers/functions out into a different work queue)
Documentation
use super::*;

#[tokio::test]
async fn local_queue_can_run_multiple_local_queues() {
    with_test_db(|test_db| async move {
        MULTI_LOCAL_QUEUE_CALL_COUNT.reset().await;
        let utils = test_db.worker_utils();
        utils.migrate().await.expect("Failed to migrate");

        for i in 1..=12 {
            utils
                .add_job(MultiLocalQueueJob { id: i }, JobSpec::default())
                .await
                .expect("Failed to add job");
        }

        let init_count = Arc::new(AtomicU32::new(0));
        let plugin = LocalQueueInitCounterPlugin {
            counter: init_count.clone(),
        };

        let worker_fut = spawn_local({
            let database = test_db.database.clone();
            async move {
                Worker::options()
                    .database(database)
                    .concurrency(5)
                    .local_queue(LocalQueueConfig::default().with_size(3).with_queue_count(4))
                    .define_job::<MultiLocalQueueJob>()
                    .add_plugin(plugin)
                    .init()
                    .await
                    .expect("Failed to create worker")
                    .run()
                    .await
                    .expect("Failed to run worker");
            }
        });

        let start_time = Instant::now();
        while MULTI_LOCAL_QUEUE_CALL_COUNT.get().await < 12 {
            if start_time.elapsed().as_secs() > 10 {
                worker_fut.abort();
                panic!(
                    "All jobs should have been executed by now, got {}",
                    MULTI_LOCAL_QUEUE_CALL_COUNT.get().await
                );
            }
            sleep(Duration::from_millis(50)).await;
        }

        assert_eq!(
            init_count.load(Ordering::SeqCst),
            4,
            "A worker with queue_count=4 should start four local queues"
        );

        worker_fut.abort();
    })
    .await;
}

#[tokio::test]
async fn local_queue_count_is_limited_by_concurrency() {
    with_test_db(|test_db| async move {
        let utils = test_db.worker_utils();
        utils.migrate().await.expect("Failed to migrate");

        let init_count = Arc::new(AtomicU32::new(0));
        let plugin = LocalQueueInitCounterPlugin {
            counter: init_count.clone(),
        };

        let worker_fut = spawn_local({
            let database = test_db.database.clone();
            async move {
                Worker::options()
                    .database(database)
                    .concurrency(2)
                    .local_queue(LocalQueueConfig::default().with_size(3).with_queue_count(4))
                    .define_job::<MultiLocalQueueJob>()
                    .add_plugin(plugin)
                    .init()
                    .await
                    .expect("Failed to create worker")
                    .run()
                    .await
                    .expect("Failed to run worker");
            }
        });

        let start_time = Instant::now();
        while init_count.load(Ordering::SeqCst) < 2 {
            if start_time.elapsed().as_secs() > 5 {
                worker_fut.abort();
                panic!(
                    "Expected two local queues to initialize, got {}",
                    init_count.load(Ordering::SeqCst)
                );
            }
            sleep(Duration::from_millis(50)).await;
        }

        assert_eq!(
            init_count.load(Ordering::SeqCst),
            2,
            "queue_count should be capped at worker concurrency"
        );

        worker_fut.abort();
    })
    .await;
}