graphile_worker_utils 0.1.1

Worker utility helpers for graphile_worker
Documentation
use graphile_worker_task_handler::{BatchTaskHandler, TaskHandler};
use serde::Serialize;

use super::client::WorkerUtils;
use graphile_worker_job::Job;
use graphile_worker_job_spec::JobSpec;
use graphile_worker_queries::add_job::single::add_job as insert_job;
use graphile_worker_queries::errors::GraphileWorkerError;

mod analyze;
mod bulk;
mod hooks;

use hooks::invoke_before_job_schedule;

pub(super) use bulk::{add_jobs, add_raw_jobs};

pub(super) async fn add_job<T: TaskHandler>(
    utils: &WorkerUtils,
    payload: T,
    spec: JobSpec,
) -> Result<Job, GraphileWorkerError> {
    let identifier = T::IDENTIFIER;
    let payload = serde_json::to_value(payload)?;

    let payload = invoke_before_job_schedule(utils, identifier, payload, &spec).await?;
    insert_job(
        &utils.database,
        &utils.schema,
        identifier,
        payload,
        spec,
        utils.use_local_time,
    )
    .await
}

pub(super) async fn add_raw_job<P>(
    utils: &WorkerUtils,
    identifier: &str,
    payload: P,
    spec: JobSpec,
) -> Result<Job, GraphileWorkerError>
where
    P: Serialize,
{
    let payload = serde_json::to_value(payload)?;

    let payload = invoke_before_job_schedule(utils, identifier, payload, &spec).await?;
    insert_job(
        &utils.database,
        &utils.schema,
        identifier,
        payload,
        spec,
        utils.use_local_time,
    )
    .await
}

pub(super) async fn add_batch_job<T: BatchTaskHandler>(
    utils: &WorkerUtils,
    payloads: Vec<T>,
    spec: JobSpec,
) -> Result<Job, GraphileWorkerError> {
    if payloads.is_empty() {
        return Err(GraphileWorkerError::JobScheduleFailed(
            "batch job payload must contain at least one item".to_string(),
        ));
    }

    let identifier = T::IDENTIFIER;
    let payload = serde_json::to_value(payloads)?;

    let payload = invoke_before_job_schedule(utils, identifier, payload, &spec).await?;

    let serde_json::Value::Array(payloads) = payload else {
        return Err(GraphileWorkerError::JobScheduleFailed(
            "before_job_schedule must return a JSON array for batch jobs".to_string(),
        ));
    };

    if payloads.is_empty() {
        return Err(GraphileWorkerError::JobScheduleFailed(
            "batch job payload must contain at least one item".to_string(),
        ));
    }

    insert_job(
        &utils.database,
        &utils.schema,
        identifier,
        serde_json::Value::Array(payloads),
        spec,
        utils.use_local_time,
    )
    .await
}