graphile_worker_utils 0.1.1

Worker utility helpers for graphile_worker
Documentation
use std::collections::HashSet;

use graphile_worker_task_handler::TaskHandler;

use graphile_worker_job::Job;
use graphile_worker_job_spec::{JobKeyMode, JobSpec};
use graphile_worker_queries::add_job::batch::add_jobs as insert_jobs;
use graphile_worker_queries::add_job::types::{JobToAdd, RawJobSpec};
use graphile_worker_queries::errors::GraphileWorkerError;
use graphile_worker_queries::task_identifiers::get_tasks_details;

use super::super::client::WorkerUtils;
use super::analyze::analyze_jobs_after_large_batch;
use super::hooks::invoke_before_job_schedule;

pub(crate) async fn add_jobs<T: TaskHandler + Clone>(
    utils: &WorkerUtils,
    jobs: &[(T, &JobSpec)],
) -> Result<Vec<Job>, GraphileWorkerError> {
    if jobs.is_empty() {
        return Ok(vec![]);
    }

    let identifier = T::IDENTIFIER;
    let mut job_inputs = Vec::with_capacity(jobs.len());
    for (payload, spec) in jobs {
        let payload_value = serde_json::to_value(payload)?;
        job_inputs.push((identifier, payload_value, *spec));
    }

    let (jobs_to_add, job_key_preserve_run_at) = prepare_batch_jobs(utils, job_inputs).await?;

    let task_details = utils.task_details.read().await;

    let added_jobs = insert_jobs(
        &utils.database,
        &utils.schema,
        &jobs_to_add,
        &task_details,
        job_key_preserve_run_at,
        utils.use_local_time,
    )
    .await?;
    drop(task_details);

    analyze_jobs_after_large_batch(utils, jobs.len()).await;

    Ok(added_jobs)
}

pub(crate) async fn add_raw_jobs(
    utils: &WorkerUtils,
    jobs: &[RawJobSpec],
) -> Result<Vec<Job>, GraphileWorkerError> {
    if jobs.is_empty() {
        return Ok(vec![]);
    }

    let job_inputs: Vec<_> = jobs
        .iter()
        .map(|job| (job.identifier.as_str(), job.payload.clone(), &job.spec))
        .collect();

    let (jobs_to_add, job_key_preserve_run_at) = prepare_batch_jobs(utils, job_inputs).await?;
    let task_details =
        get_tasks_details(&utils.database, &utils.schema, unique_identifiers(jobs)).await?;

    let added_jobs = insert_jobs(
        &utils.database,
        &utils.schema,
        &jobs_to_add,
        &task_details,
        job_key_preserve_run_at,
        utils.use_local_time,
    )
    .await?;
    drop(task_details);

    analyze_jobs_after_large_batch(utils, jobs.len()).await;

    Ok(added_jobs)
}

async fn prepare_batch_jobs<'a>(
    utils: &WorkerUtils,
    jobs: Vec<(&'a str, serde_json::Value, &'a JobSpec)>,
) -> Result<(Vec<JobToAdd<'a>>, bool), GraphileWorkerError> {
    let mut jobs_to_add = Vec::with_capacity(jobs.len());
    let mut job_key_preserve_run_at = false;

    for (identifier, payload, spec) in jobs {
        let payload = invoke_before_job_schedule(utils, identifier, payload, spec).await?;

        job_key_preserve_run_at |= spec
            .job_key_mode()
            .as_ref()
            .is_some_and(|m| matches!(m, JobKeyMode::PreserveRunAt));

        jobs_to_add.push(JobToAdd {
            identifier,
            payload,
            spec,
        });
    }

    Ok((jobs_to_add, job_key_preserve_run_at))
}

fn unique_identifiers(jobs: &[RawJobSpec]) -> Vec<String> {
    let mut seen_identifiers = HashSet::with_capacity(jobs.len());
    let mut unique_identifiers = Vec::new();
    for job in jobs {
        if seen_identifiers.insert(job.identifier.as_str()) {
            unique_identifiers.push(job.identifier.clone());
        }
    }
    unique_identifiers
}