graphile_worker 0.13.3

High performance Rust/PostgreSQL job queue (also suitable for getting jobs generated by PostgreSQL triggers/functions out into a different work queue)
Documentation
use super::*;

#[tokio::test]
async fn local_queue_refetch_delay_abort_with_low_concurrency() {
    with_test_db(|test_db| async move {
        REFETCH_ABORT_CALL_COUNT.reset().await;
        let utils = test_db.worker_utils();
        utils.migrate().await.expect("Failed to migrate");

        let init_count = Arc::new(AtomicU32::new(0));
        let plugin = LocalQueueInitCounterPlugin {
            counter: init_count.clone(),
        };

        let worker = Arc::new(
            Worker::options()
                .database(test_db.database.clone())
                .concurrency(2)
                .poll_interval(Duration::from_secs(30))
                .local_queue(
                    LocalQueueConfig::default()
                        .with_size(10)
                        .with_refetch_delay(
                            RefetchDelayConfig::default()
                                .with_duration(Duration::from_secs(30))
                                .with_threshold(0)
                                .with_max_abort_threshold(3),
                        ),
                )
                .listen_os_shutdown_signals(false)
                .define_job::<RefetchAbortJob>()
                .add_plugin(plugin)
                .init()
                .await
                .expect("Failed to create worker"),
        );

        let worker_for_run = Arc::clone(&worker);
        let worker_fut = spawn_local(async move {
            let _ = worker_for_run.run().await;
        });

        wait_for_atomic_counter(
            &init_count,
            1,
            Duration::from_secs(5),
            Duration::from_millis(50),
            "Local queue should have initialized before adding jobs",
        )
        .await;

        for i in 1..=5 {
            utils
                .add_job(RefetchAbortJob { id: i }, JobSpec::default())
                .await
                .expect("Failed to add job");
            sleep(Duration::from_millis(50)).await;
        }

        let start_time = Instant::now();
        while REFETCH_ABORT_CALL_COUNT.get().await < 5 {
            if start_time.elapsed().as_secs() > 10 {
                worker_fut.abort();
                panic!(
                    "Jobs should have been executed (got {}). This would deadlock without the fix: \
                    with concurrency=2 and abort_threshold=3, handlers would block on oneshot \
                    channels before enough pulses could trigger the abort.",
                    REFETCH_ABORT_CALL_COUNT.get().await
                );
            }
            sleep(Duration::from_millis(100)).await;
        }

        assert_eq!(
            REFETCH_ABORT_CALL_COUNT.get().await,
            5,
            "All 5 jobs should have been executed after refetch delay abort"
        );

        worker.request_shutdown();
        worker_fut.abort();
    })
    .await;
}