graphile_worker 0.13.3

High performance Rust/PostgreSQL job queue (also suitable for getting jobs generated by PostgreSQL triggers/functions out into a different work queue)
Documentation
use graphile_worker::{
    worker_utils::types::CleanupTask, IntoTaskHandlerResult, JobSpec, TaskHandler, WorkerContext,
};
use serde::{Deserialize, Serialize};

use crate::helpers::{with_test_db, StaticCounter};

#[tokio::test]
async fn cleanup_with_gc_task_identifiers() {
    with_test_db(|test_db| async move {
        let worker_utils = test_db
            .create_worker_options()
            .init()
            .await
            .expect("Failed to create worker")
            .create_utils();

        let task_identifiers = ["job3", "test_job1", "test_job2", "test_job3"];
        let mut completed_job_id = None;

        for task_identifier in task_identifiers.iter() {
            let job = worker_utils
                .add_raw_job(task_identifier, serde_json::json!({}), JobSpec::default())
                .await
                .expect("Failed to add job");

            if *task_identifier == "test_job2" {
                completed_job_id = Some(*job.id());
            }
        }

        if let Some(job_id) = completed_job_id {
            worker_utils
                .complete_jobs(&[job_id])
                .await
                .expect("Failed to complete the job");
        }

        let mut tasks_before = test_db.get_tasks().await;
        tasks_before.sort();

        assert_eq!(
            tasks_before,
            {
                let mut vec = vec!["job3", "test_job1", "test_job2", "test_job3"];
                vec.sort();
                vec
            },
            "All task identifiers should be present before cleanup"
        );

        worker_utils
            .cleanup(&[CleanupTask::GcTaskIdentifiers])
            .await
            .expect("Failed to cleanup task identifiers");

        let mut tasks_after = test_db.get_tasks().await;
        tasks_after.sort();

        assert_eq!(
            tasks_after,
            {
                let mut vec = vec!["job3", "test_job1", "test_job3"];
                vec.sort();
                vec
            },
            "Only relevant task identifiers should remain after cleanup"
        );
    })
    .await;
}

#[tokio::test]
async fn gc_task_identifiers_preserves_known_identifiers_for_horizontal_scaling() {
    static CALL_COUNT: StaticCounter = StaticCounter::new();

    #[derive(Serialize, Deserialize)]
    struct RefreshTestJob;

    impl TaskHandler for RefreshTestJob {
        const IDENTIFIER: &'static str = "refresh_test_job";

        async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
            CALL_COUNT.increment().await;
        }
    }

    with_test_db(|test_db| async move {
        let worker = test_db
            .create_worker_options()
            .define_job::<RefreshTestJob>()
            .init()
            .await
            .expect("Failed to create worker");

        let utils = worker.create_utils();

        utils
            .add_job(RefreshTestJob, JobSpec::default())
            .await
            .expect("Failed to add first job");

        worker.run_once().await.expect("Failed to run worker");
        assert_eq!(CALL_COUNT.get().await, 1, "First job should have run");

        let jobs_after_first_run = test_db.get_jobs().await;
        assert!(jobs_after_first_run.is_empty(), "Job should be completed");

        let task_id_before = test_db.get_task_id("refresh_test_job").await;

        utils
            .cleanup(&[CleanupTask::GcTaskIdentifiers])
            .await
            .expect("Failed to cleanup");

        let task_id_after = test_db.get_task_id("refresh_test_job").await;
        assert_eq!(
            task_id_before, task_id_after,
            "Task ID should stay the same (preserved for horizontal scaling safety)"
        );

        utils
            .add_job(RefreshTestJob, JobSpec::default())
            .await
            .expect("Failed to add second job");

        worker
            .run_once()
            .await
            .expect("Failed to run worker second time");
        assert_eq!(CALL_COUNT.get().await, 2, "Second job should have run");

        let jobs_after_second_run = test_db.get_jobs().await;
        assert!(
            jobs_after_second_run.is_empty(),
            "Second job should be completed"
        );
    })
    .await;
}