graphile_worker_queries 0.1.0

Database query helpers for graphile_worker
Documentation
use std::sync::Arc;
use std::time::Duration;

use graphile_worker_database::{DbExecutorArg, DbParams, DbValue, Schema};
use graphile_worker_job::Job;
use indoc::formatdoc;

use crate::duration::duration_as_millis_i64;
use crate::errors::Result;
use crate::rows::get_required;
use crate::schema_names::{PrivateTable, WorkerFunction};

pub async fn recover_dead_worker_jobs(
    mut executor: impl DbExecutorArg,
    schema: impl Into<Schema>,
    worker_ids: &[String],
    recovery_delay: Duration,
) -> Result<i32> {
    if worker_ids.is_empty() {
        return Ok(0);
    }

    let schema = schema.into();
    let recover_dead_worker_jobs = WorkerFunction::RecoverDeadWorkerJobs.qualified(&schema);
    let sql = formatdoc!(
        r#"
            SELECT {recover_dead_worker_jobs}(
                $1::text[],
                $2::bigint * interval '1 millisecond'
            ) AS recovered_count;
        "#
    );

    let row = executor
        .fetch_one(
            &sql,
            DbParams::from(vec![
                DbValue::TextArray(worker_ids.to_vec()),
                DbValue::I64(duration_as_millis_i64(recovery_delay)),
            ]),
        )
        .await?;

    get_required(&row, "recovered_count")
}

pub async fn get_locked_jobs_for_recovery(
    mut executor: impl DbExecutorArg,
    schema: impl Into<Schema>,
    worker_ids: &[String],
) -> Result<Vec<Arc<Job>>> {
    if worker_ids.is_empty() {
        return Ok(Vec::new());
    }

    let schema = schema.into();
    let jobs = PrivateTable::Jobs.qualified(&schema);
    let tasks = PrivateTable::Tasks.qualified(&schema);
    let sql = formatdoc!(
        r#"
            SELECT jobs.*, tasks.identifier AS task_identifier
            FROM {jobs} AS jobs
            JOIN {tasks} AS tasks ON tasks.id = jobs.task_id
            WHERE jobs.locked_by = ANY($1::text[])
            ORDER BY jobs.id ASC;
        "#
    );

    let rows = executor
        .fetch_all(
            &sql,
            DbParams::from(vec![DbValue::TextArray(worker_ids.to_vec())]),
        )
        .await?;

    rows.iter()
        .map(|row| {
            let db_job = crate::rows::db_job_from_row(row)?;
            let task_identifier = get_required(row, "task_identifier")?;
            Ok(Arc::new(Job::from_db_job(db_job, task_identifier)))
        })
        .collect()
}