graphile_worker 0.13.3

High performance Rust/PostgreSQL job queue (also suitable for getting jobs generated by PostgreSQL triggers/functions out into a different work queue)
Documentation
use super::*;

#[tokio::test]
async fn local_queue_returns_jobs_on_shutdown() {
    with_test_db(|test_db| async move {
        SHUTDOWN_CALL_COUNT.reset().await;
        let utils = test_db.worker_utils();
        utils.migrate().await.expect("Failed to migrate");

        for i in 1..=10 {
            utils
                .add_job(ShutdownJob { id: i }, JobSpec::default())
                .await
                .expect("Failed to add job");
        }

        let initial_jobs = test_db.get_jobs().await;
        assert_eq!(initial_jobs.len(), 10, "Should have 10 jobs initially");

        let worker = Arc::new(
            Worker::options()
                .database(test_db.database.clone())
                .concurrency(2)
                .local_queue(LocalQueueConfig::builder().size(20).build())
                .listen_os_shutdown_signals(false)
                .define_job::<ShutdownJob>()
                .init()
                .await
                .expect("Failed to create worker"),
        );

        let worker_for_run = Arc::clone(&worker);
        let worker_fut = spawn_local(async move {
            let _ = worker_for_run.run().await;
        });

        wait_for_jobs(
            &test_db,
            Duration::from_secs(5),
            Duration::from_millis(100),
            "Jobs should be locked by the worker before shutdown",
            |jobs| jobs.iter().filter(|j| j.locked_by.is_some()).count() >= 2,
        )
        .await;

        worker.request_shutdown();

        let start_time = Instant::now();
        while !worker_fut.is_finished() {
            if start_time.elapsed().as_secs() > 10 {
                worker_fut.abort();
                panic!("Worker should have shut down by now");
            }
            sleep(Duration::from_millis(100)).await;
        }

        let remaining_jobs = wait_for_jobs(
            &test_db,
            Duration::from_secs(5),
            Duration::from_millis(100),
            "Most jobs should be returned to the queue",
            |jobs| jobs.iter().filter(|j| j.locked_by.is_none()).count() >= 8,
        )
        .await;
        let unlocked_jobs: Vec<_> = remaining_jobs
            .iter()
            .filter(|j| j.locked_by.is_none())
            .collect();

        assert!(
            unlocked_jobs.len() >= 8,
            "Most jobs should be returned to the queue (got {} unlocked out of {})",
            unlocked_jobs.len(),
            remaining_jobs.len()
        );

        assert_eq!(
            SHUTDOWN_CALL_COUNT.get().await,
            0,
            "No jobs should have completed (they take 10s each)"
        );
    })
    .await;
}

#[tokio::test]
async fn local_queue_returns_jobs_on_ttl_expiry() {
    with_test_db(|test_db| async move {
        TTL_CALL_COUNT.reset().await;
        let utils = test_db.worker_utils();
        utils.migrate().await.expect("Failed to migrate");

        for i in 1..=10 {
            utils
                .add_job(TtlExpiryJob { id: i }, JobSpec::default())
                .await
                .expect("Failed to add job");
        }

        let initial_jobs = test_db.get_jobs().await;
        assert_eq!(initial_jobs.len(), 10, "Should have 10 jobs initially");

        let worker = Arc::new(
            Worker::options()
                .database(test_db.database.clone())
                .concurrency(1)
                .local_queue(
                    LocalQueueConfig::builder()
                        .size(20)
                        .ttl(Duration::from_millis(500))
                        .build(),
                )
                .listen_os_shutdown_signals(false)
                .define_job::<TtlExpiryJob>()
                .init()
                .await
                .expect("Failed to create worker"),
        );

        let worker_for_run = Arc::clone(&worker);
        let worker_fut = spawn_local(async move {
            let _ = worker_for_run.run().await;
        });

        let start_time = Instant::now();
        let locked_jobs_count = loop {
            let jobs_during_processing = test_db.get_jobs().await;
            let locked_jobs_count = jobs_during_processing
                .iter()
                .filter(|j| j.locked_by.is_some())
                .count();

            if locked_jobs_count > 0 || start_time.elapsed() > Duration::from_secs(5) {
                break locked_jobs_count;
            }

            sleep(Duration::from_millis(100)).await;
        };

        assert!(
            locked_jobs_count > 0,
            "At least one job should be locked by worker"
        );

        let start_time = Instant::now();
        let jobs_after_ttl = loop {
            let jobs = test_db.get_jobs().await;
            let unlocked_jobs_count = jobs.iter().filter(|j| j.locked_by.is_none()).count();

            if unlocked_jobs_count >= 8 || start_time.elapsed() > Duration::from_secs(5) {
                break jobs;
            }

            sleep(Duration::from_millis(100)).await;
        };
        let unlocked_jobs: Vec<_> = jobs_after_ttl
            .iter()
            .filter(|j| j.locked_by.is_none())
            .collect();

        assert!(
            unlocked_jobs.len() >= 8,
            "Most jobs should be returned after TTL expiry (got {} unlocked out of {})",
            unlocked_jobs.len(),
            jobs_after_ttl.len()
        );

        worker.request_shutdown();
        worker_fut.abort();
    })
    .await;
}

#[tokio::test]
async fn local_queue_release_waits_for_run_loop() {
    with_test_db(|test_db| async move {
        RELEASE_WAITS_CALL_COUNT.reset().await;
        RELEASE_WAITS_COMPLETED.reset().await;
        let utils = test_db.worker_utils();
        utils.migrate().await.expect("Failed to migrate");

        for i in 1..=3 {
            utils
                .add_job(ReleaseWaitsJob { id: i }, JobSpec::default())
                .await
                .expect("Failed to add job");
        }

        let worker = Arc::new(
            Worker::options()
                .database(test_db.database.clone())
                .concurrency(2)
                .local_queue(LocalQueueConfig::builder().size(10).build())
                .listen_os_shutdown_signals(false)
                .define_job::<ReleaseWaitsJob>()
                .init()
                .await
                .expect("Failed to create worker"),
        );

        let worker_for_run = Arc::clone(&worker);
        let worker_fut = spawn_local(async move {
            let _ = worker_for_run.run().await;
        });

        let start = Instant::now();
        while RELEASE_WAITS_CALL_COUNT.get().await == 0 {
            if start.elapsed() > Duration::from_secs(5) {
                panic!("At least one job should have started");
            }
            sleep(Duration::from_millis(50)).await;
        }

        worker.request_shutdown();

        let start = Instant::now();
        while !worker_fut.is_finished() {
            if start.elapsed().as_secs() > 10 {
                worker_fut.abort();
                panic!("Worker should have finished shutdown by now");
            }
            sleep(Duration::from_millis(50)).await;
        }

        assert!(
            worker_fut.is_finished(),
            "Worker future should be finished after shutdown"
        );
    })
    .await;
}