graphile_worker_utils 0.1.1

Worker utility helpers for graphile_worker
Documentation
use graphile_worker_database::{DbExecutor, DbValue};
use indoc::formatdoc;

use super::client::WorkerUtils;
use super::types::RescheduleJobOptions;
use graphile_worker_job::DbJob;
use graphile_worker_queries::errors::GraphileWorkerError;
use graphile_worker_queries::schema_names::WorkerFunction;

pub(super) async fn remove_job(
    utils: &WorkerUtils,
    job_key: &str,
) -> Result<(), GraphileWorkerError> {
    let remove_job = WorkerFunction::RemoveJob.qualified(&utils.schema);
    let sql = formatdoc!(
        r#"
            select * from {remove_job}($1::text);
        "#
    );

    utils
        .database
        .execute(&sql, vec![DbValue::Text(job_key.to_string())].into())
        .await?;

    Ok(())
}

pub(super) async fn complete_jobs(
    utils: &WorkerUtils,
    ids: &[i64],
) -> Result<Vec<DbJob>, GraphileWorkerError> {
    let complete_jobs = WorkerFunction::CompleteJobs.qualified(&utils.schema);
    let sql = formatdoc!(
        r#"
            select * from {complete_jobs}($1::bigint[]);
        "#
    );

    fetch_db_jobs(utils, &sql, vec![DbValue::I64Array(ids.to_vec())]).await
}

pub(super) async fn permanently_fail_jobs(
    utils: &WorkerUtils,
    ids: &[i64],
    reason: &str,
) -> Result<Vec<DbJob>, GraphileWorkerError> {
    let permanently_fail_jobs = WorkerFunction::PermanentlyFailJobs.qualified(&utils.schema);
    let sql = formatdoc!(
        r#"
            select * from {permanently_fail_jobs}($1::bigint[], $2::text);
        "#
    );

    fetch_db_jobs(
        utils,
        &sql,
        vec![
            DbValue::I64Array(ids.to_vec()),
            DbValue::Text(reason.to_string()),
        ],
    )
    .await
}

pub(super) async fn reschedule_jobs(
    utils: &WorkerUtils,
    ids: &[i64],
    options: RescheduleJobOptions,
) -> Result<Vec<DbJob>, GraphileWorkerError> {
    let reschedule_jobs = WorkerFunction::RescheduleJobs.qualified(&utils.schema);
    let sql = formatdoc!(
        r#"
            select * from {reschedule_jobs}(
                $1::bigint[],
                run_at := $2::timestamptz,
                priority := $3::int,
                attempts := $4::int,
                max_attempts := $5::int
            );
        "#
    );

    fetch_db_jobs(
        utils,
        &sql,
        vec![
            DbValue::I64Array(ids.to_vec()),
            DbValue::TimestampTzOpt(options.run_at),
            DbValue::I32Opt(options.priority.map(i32::from)),
            DbValue::I32Opt(options.attempts.map(i32::from)),
            DbValue::I32Opt(options.max_attempts.map(i32::from)),
        ],
    )
    .await
}

pub(super) async fn force_unlock_workers(
    utils: &WorkerUtils,
    worker_ids: &[&str],
) -> Result<(), GraphileWorkerError> {
    let force_unlock_workers = WorkerFunction::ForceUnlockWorkers.qualified(&utils.schema);
    let sql = formatdoc!(
        r#"
            select * from {force_unlock_workers}($1::text[]);
        "#
    );

    utils
        .database
        .execute(
            &sql,
            vec![DbValue::TextArray(
                worker_ids
                    .iter()
                    .map(|worker_id| worker_id.to_string())
                    .collect(),
            )]
            .into(),
        )
        .await?;

    Ok(())
}

async fn fetch_db_jobs(
    utils: &WorkerUtils,
    sql: &str,
    params: Vec<DbValue>,
) -> Result<Vec<DbJob>, GraphileWorkerError> {
    let jobs = utils
        .database
        .fetch_all(sql, params.into())
        .await?
        .iter()
        .map(graphile_worker_queries::rows::db_job_from_row)
        .collect::<std::result::Result<Vec<_>, _>>()?;

    Ok(jobs)
}