graphile_worker 0.13.0

High performance Rust/PostgreSQL job queue (also suitable for getting jobs generated by PostgreSQL triggers/functions out into a different work queue)
Documentation
use graphile_worker::{
    Cron, IntoTaskHandlerResult, JobDefinition, JobSpec, JobStart, TaskHandler, WorkerContext,
    WorkerOptions,
};
use serde::{Deserialize, Serialize};

use crate::helpers::{with_test_db, StaticCounter};

mod helpers;

fn database_url_for_test_db(name: &str) -> String {
    let mut database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set");
    let scheme_end = database_url
        .find("://")
        .expect("DATABASE_URL must have scheme")
        + 3;
    let path_start = database_url[scheme_end..]
        .find('/')
        .map(|offset| scheme_end + offset)
        .expect("DATABASE_URL must include database path");
    let query_start = database_url[path_start..]
        .find('?')
        .map(|offset| path_start + offset)
        .unwrap_or(database_url.len());

    database_url.replace_range(path_start + 1..query_start, name);
    database_url
}

#[derive(Deserialize, Serialize)]
struct BuilderJob;

impl TaskHandler for BuilderJob {
    const IDENTIFIER: &'static str = "builder_job";

    async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {}
}

#[derive(Deserialize, Serialize)]
struct OtherBuilderJob;

impl TaskHandler for OtherBuilderJob {
    const IDENTIFIER: &'static str = "other_builder_job";

    async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {}
}

fn builder_jobs() -> [JobDefinition; 2] {
    [BuilderJob::definition(), OtherBuilderJob::definition()]
}

#[tokio::test]
async fn worker_options_initializes_from_database_url() {
    with_test_db(|test_db| async move {
        let database_url = database_url_for_test_db(&test_db.name);

        let worker = WorkerOptions::default()
            .database_url(&database_url)
            .max_pg_conn(1)
            .listen_os_shutdown_signals(false)
            .with_cron(Cron::every_minute::<BuilderJob>())
            .with_cron("* * * * * builder_job")
            .expect("Failed to add second crontab")
            .use_local_time(true)
            .on(JobStart, |_ctx| async {})
            .define_job::<BuilderJob>()
            .init()
            .await
            .expect("Failed to create worker");

        assert!(*worker.use_local_time());
        assert_eq!(worker.crontabs().len(), 2);
        assert_eq!(worker.concurrency(), &num_cpus::get());
    })
    .await;
}

#[tokio::test]
async fn worker_options_requires_database_url_or_pool() {
    let result = WorkerOptions::default()
        .listen_os_shutdown_signals(false)
        .init()
        .await;

    match result {
        Err(graphile_worker::WorkerBuildError::MissingDatabaseUrl) => {}
        Err(error) => panic!("Expected missing database URL error, got {error:?}"),
        Ok(_) => panic!("Expected worker initialization to fail"),
    }
}

#[tokio::test]
async fn worker_options_accepts_job_definitions() {
    with_test_db(|test_db| async move {
        let worker = test_db
            .create_worker_options()
            .listen_os_shutdown_signals(false)
            .define_jobs(builder_jobs())
            .init()
            .await
            .expect("Failed to create worker");

        assert!(worker.jobs().contains_key("builder_job"));
        assert!(worker.jobs().contains_key("other_builder_job"));
    })
    .await;
}

#[tokio::test]
async fn worker_options_runs_job_definitions() {
    static DEFINITION_RUN_COUNT: StaticCounter = StaticCounter::new();

    #[derive(Deserialize, Serialize)]
    struct DefinitionRunJob {
        value: u32,
    }

    impl TaskHandler for DefinitionRunJob {
        const IDENTIFIER: &'static str = "definition_run_job";

        async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
            assert_eq!(self.value, 42);
            DEFINITION_RUN_COUNT.increment().await;
        }
    }

    DEFINITION_RUN_COUNT.reset().await;

    with_test_db(|test_db| async move {
        let definition = DefinitionRunJob::definition();
        let _handler = definition.handler();

        let worker = test_db
            .create_worker_options()
            .listen_os_shutdown_signals(false)
            .define_jobs([definition])
            .init()
            .await
            .expect("Failed to create worker");

        worker
            .create_utils()
            .add_job(DefinitionRunJob { value: 42 }, JobSpec::default())
            .await
            .expect("Failed to add job");

        worker.run_once().await.expect("Failed to run worker");

        assert_eq!(DEFINITION_RUN_COUNT.get().await, 1);
    })
    .await;
}

#[test]
fn task_handler_definition_exposes_identifier() {
    assert_eq!(BuilderJob::definition().identifier(), "builder_job");
    assert_eq!(
        JobDefinition::of::<OtherBuilderJob>().identifier(),
        "other_builder_job"
    );
}

#[test]
fn worker_options_with_cron_accepts_crontab_text() {
    let _ = WorkerOptions::default()
        .with_cron("* * * * * builder_job")
        .expect("Failed to parse crontab string literal");

    let _ = WorkerOptions::default()
        .with_cron(String::from("* * * * * builder_job"))
        .expect("Failed to parse owned crontab string");
}

#[test]
#[allow(deprecated)]
fn worker_options_deprecated_with_crontab_still_parses_text() {
    let _ = WorkerOptions::default()
        .with_crontab("* * * * * builder_job")
        .expect("Failed to parse deprecated crontab string");
}